1: #line 3 "./lpsrc/flx_posix_demux.ipk" 2: #ifndef __FLX_DEMUX_POSIX_DEMUXER_H__ 3: #define __FLX_DEMUX_POSIX_DEMUXER_H__ 4: 5: // base classes for posix style demuxers 6: 7: #include "demux_demuxer.hpp" 8: 9: namespace flx { namespace demux { 10: class DEMUX_EXTERN posix_demuxer; // fwd decl 11: 12: // abc 13: class DEMUX_EXTERN posix_wakeup { 14: public: 15: virtual ~posix_wakeup() {} 16: 17: // when called, the wakeup has finished and been removed. 18: virtual void wakeup(posix_demuxer& demux) = 0; 19: }; 20: 21: class DEMUX_EXTERN socket_wakeup : public posix_wakeup { 22: public: 23: int s; // the non blocking socket 24: int wakeup_flags; // set on wakeup, r/w or both 25: 26: socket_wakeup() : s(-1) {} 27: }; 28: 29: class DEMUX_EXTERN posix_demuxer : public demuxer { 30: protected: 31: void async_quit(); // useful for requesting wait thread quit in 32: // thread safe demuxer destructors. doesn't throw. 33: 34: public: 35: virtual ~posix_demuxer(); 36: 37: // posix style sockets. for reading and writing (but not both at once 38: // for the same socket_wakeup) you are guaranteed to receive only one 39: // wakeup per call to this function when you call wait. 40: // returns -1 if no wakeup is coming and zero if one is. 41: // For simultaneous reading and writing you may get two wakeups, 42: // that is, it may violate the "one shot" rule. Ignoring for now, 43: // as it's not a common use. This makes it undefined behaviour. 44: // wakeup is owned by the demuxer until its wakeup is called, 45: // so treat it with kid gloves, i.e. don't mess with it. 46: virtual int add_socket_wakeup(socket_wakeup* sv, int flags) = 0; 47: 48: // to be called when we can read & write without blocking 49: // return true if connection closed, update pb 50: // sort of a strange place to have this..., more a socket wakeup 51: // thing, even if static 52: static bool socket_recv(int s, sel_param* pb); 53: static bool socket_send(int s, sel_param* pb); 54: }; 55: 56: // some handy control blocks for common non-blocking socket operations 57: // note that they "fortuitously" both have start methods. hmm. 58: // a socket io one could be handy here. 59: 60: // this one's restartable (makes sense for listener sockets) 61: class DEMUX_EXTERN accept_control_block : public socket_wakeup { 62: public: 63: int accepted; // accepted socket (out) 64: int socket_err; // the error, if acceptee == -1, else 0 (out) 65: 66: accept_control_block() : accepted(-1), socket_err(0) {} 67: 68: virtual int start(posix_demuxer& demux); 69: virtual void wakeup(posix_demuxer& demux); 70: }; 71: 72: class DEMUX_EXTERN connect_control_block : public socket_wakeup { 73: public: 74: int socket_err; // outgoing error (on start or wake) 75: // this should probably be a sockaddr type 76: const char* addy; // addr (dotted quad) (in) 77: int p; // port (in) 78: 79: connect_control_block() : socket_err(0) {} 80: 81: virtual int start(posix_demuxer& demux); 82: virtual void wakeup(posix_demuxer& demux); 83: 84: // oops, can't check for s != -1 as it's always there. 85: // was always "finished" and so I started io, losing the first wakeup 86: // on epoll evtsrc. Is this right, or should it be != EINPROGRESS? 87: // keep in sync with iocp version. give socket_err initial definition 88: // that works with this? 89: bool finished() { return ( 0 == socket_err); } 90: }; 91: 92: }} // namespace demux, flx 93: #endif 94:
1: #line 98 "./lpsrc/flx_posix_demux.ipk" 2: #include "demux_posix_demuxer.hpp" 3: #include "demux_sockety.hpp" 4: #include "demux_quitter.hpp" // fns for waking and quitting a demuxer 5: 6: #include <stdio.h> // "printf" 7: #include <assert.h> // assert 8: #include <string.h> // strerror 9: #include <unistd.h> // close 10: 11: #include <sys/types.h> // send/recv 12: #include <sys/socket.h> 13: 14: //#include <sys/errno.h> 15: #include <errno.h> // GUSI & solaris prefer this 16: 17: namespace flx { namespace demux { 18: 19: posix_demuxer::~posix_demuxer() 20: { 21: } 22: 23: bool 24: posix_demuxer::socket_recv(int s, sel_param* pb) 25: { 26: // why do I have the zero buffer size? 27: assert(pb->buffer_size > pb->bytes_written || 0 == pb->buffer_size); 28: ssize_t nbytes; 29: 30: // if this were read then this fn would work with non-sockets 31: nbytes = recv(s, pb->buffer + pb->bytes_written, 32: pb->buffer_size - pb->bytes_written, 0); 33: 34: /* 35: fprintf(stderr,"posix_demuxer RECV: s=%d, pb=%p, buf+%d, req=%d, got %d\n", 36: s,pb, int(pb->bytes_written), int(pb->buffer_size - pb->bytes_written), int(nbytes) 37: ); 38: */ 39: if(nbytes <= 0) 40: { 41: if(nbytes == 0) 42: { 43: return true; // connection closed 44: } 45: else 46: { 47: perror("recv"); // can get reset connection here 48: return true; // so say closed, yeah? 49: } 50: } 51: else 52: { 53: // got some data 54: pb->bytes_written += nbytes; 55: } 56: return false; // connection didn't close 57: } 58: 59: bool 60: posix_demuxer::socket_send(int s, sel_param* pb) 61: { 62: // kqueue (and some of the other ones) can let you know know how much 63: // to write... imagine that! 64: 65: // why do I have the zero buffer size? 66: assert(pb->buffer_size > pb->bytes_written || 0 == pb->buffer_size); 67: 68: ssize_t nbytes; 69: 70: nbytes = send(s, pb->buffer + pb->bytes_written, 71: pb->buffer_size - pb->bytes_written, 0); 72: 73: /* 74: fprintf(stderr,"posix_demuxer SEND: s=%d, pb=%p buf+%d, req=%d, got %d\n", 75: s,pb, int(pb->bytes_written), int(pb->buffer_size - pb->bytes_written), int(nbytes) 76: ); 77: */ 78: // similar story here, with send vs write? 79: 80: // what's the story with zero? Is that allowed or does it signal 81: // that the connection closed? 82: if(-1 == nbytes) 83: { 84: perror("send"); 85: return true; // I guess the connection closed 86: } 87: else 88: { 89: // sent some data 90: pb->bytes_written += nbytes; 91: } 92: return false; // connection didn't close 93: } 94: 95: // get a posix demuxer to quit, that is, get the demuxer's event thread 96: // to exit. doesn't return until this has happened. pretty sure that 97: // calling this on a demuxer used synchronously would be a bad idea. 98: // doesn't throw 99: void 100: posix_demuxer::async_quit() 101: { 102: try { 103: // NEW and IMPROVED!!! demux quitter which sets demux quit flag 104: // via self pipe trick then waits for self pipe/quitting callback 105: // to finish. no fear of quitter being destructed early! 106: // fprintf(stderr, "async_quit called on posix demuxer\n"); 107: demux_quitter quitter; 108: quitter.quit(this); 109: // event thread has exited at this point 110: } catch(...) { 111: fprintf(stderr, "error waking demuxer with self pipe quitter\n"); 112: } 113: } 114: 115: #if 0 116: //nbytes = recv(s, pb->buffer + pb->bytes_written, 117: // pb->buffer_size - pb->bytes_written, 0); 118: 119: // select and kqueue know when non socket fds have data. 120: // recv only works with sockets, but read works with both files 121: // and sockets and who knows what else. is there any disadvantage 122: // to using read instead? apart from losing flags arg? 123: // does read get the same 0 bytes = close behaviour 124: nbytes = read(s, pb->buffer + pb->bytes_written, 125: pb->buffer_size - pb->bytes_written); 126: #endif 127: 128: // handy posix control blocks for accept, connect. 129: 130: int 131: accept_control_block::start(posix_demuxer& demux) 132: { 133: // add listener to demuxer as reading socket - see man 2 accept 134: // returns 0 on success, -1 on failure. not sure how to communicate 135: // the error. 136: // could try the accept now, to see if it succeeds instantly... 137: // observe wakeup rules (formulate them first) 138: accepted = -1; 139: // socket_err = 0; 140: // not quite true, but I want it to be clear if this ever becomes possible 141: // to do immediately 142: socket_err = EINPROGRESS; 143: return demux.add_socket_wakeup(this, PDEMUX_READ); 144: } 145: 146: // one wakeup socket is in accepted and error in socket_err 147: void 148: accept_control_block::wakeup(posix_demuxer& demux) 149: { 150: // fprintf(stderr,"accept_control_block woke up\n"); 151: 152: // we can now accept without blocking 153: // s is the listener, ambiguously named in parent socket_wakeup class 154: accepted = nice_accept(s, &socket_err); 155: 156: if(accepted == -1) 157: { 158: fprintf(stderr, "nice_accept failed, err (%i)\n", socket_err); 159: } 160: } 161: 162: // returns -1 on failure, 0 on success. on success the call is finished 163: // (and so no wakeup) if socket_err == 0. 164: int 165: connect_control_block::start(posix_demuxer& demux) 166: { 167: // fprintf(stderr,"async connect start\n"); 168: 169: int finished; 170: 171: // returns either finished and err, or not finished 172: // and (no err || EINPROGRESS) 173: s = async_connect(addy, p, &finished, &socket_err); 174: 175: // fprintf(stderr,"async_connect returned s: %i, finished: %i, err=%i\n", 176: // s, finished, socket_err); 177: 178: if(-1 == s) // failed! 179: { 180: fprintf(stderr,"async_connect failed (%i)\n", socket_err); 181: return -1; // error in socket_err, no wakeup 182: } 183: 184: if(finished) 185: { 186: // this actually happens on solaris when connecting to localhost! 187: fprintf(stderr,"async_connect finished immediately, waking\n"); 188: fprintf(stderr, "No wakeup coming...\n"); 189: // this does not indicate an error, but that there is no wakeup 190: // coming. this could be done by a wakeup, all that happens is 191: // getsockopt is called to check the socket's error state. 192: return -1; 193: } 194: 195: // fprintf(stderr,"connect_request didn't finish immediatly, sleeping\n"); 196: 197: // add to demuxer as writing socket - see man 2 connect 198: // how do they get the error? 199: return demux.add_socket_wakeup(this, PDEMUX_WRITE); 200: } 201: 202: void 203: connect_control_block::wakeup(posix_demuxer& demux) 204: { 205: // fprintf(stderr,"connect woke up\n"); 206: // this is how we check the success of async connects 207: // if get_socket_err fails, we're treating its errno as the socket's... 208: if(get_socket_error(s, &socket_err) == -1) 209: fprintf(stderr, "eep - get_socket_err failed!\n"); 210: 211: // failed, throw away socket 212: if(0 != socket_err) 213: { 214: fprintf(stderr,"async connect error: %s (%i), closing\n", 215: strerror(socket_err), socket_err); 216: // we created the connect socket, so we close it too. 217: if(close(s) != 0) 218: perror("async socket close"); 219: 220: s = -1; // the result 221: } 222: 223: // resulting connected socket in s 224: } 225: }} 226:
1: #line 325 "./lpsrc/flx_posix_demux.ipk" 2: #ifndef __FLX_DEMUX_PFILEIO_H__ 3: #define __FLX_DEMUX_PFILEIO_H__ 4: #include <flx_demux_config.hpp> 5: 6: #include "demux_demuxer.hpp" 7: #include "pthread_sleep_queue.hpp" 8: #include "pthread_mutex.hpp" 9: // #include <sys/types.h> // off_t (don't have flx iface to this yet) 10: // can just add new constructor 11: #include "pthread_work_fifo.hpp" 12: namespace flx { namespace demux { 13: 14: // ******************************************************** 15: /// like another event source. this is basically a wrapped pread, pwrite 16: /// should probably be derived from posix_wakeup or something like that. 17: /// or have the same signature. abstract - users overload "finished 18: // ******************************************************** 19: class DEMUX_EXTERN fileio_request : public flx::pthread::worker_task 20: { 21: long offset; // make this a proper offset (64bit) 22: // off_t offset; // in: offset, for use with pread, pwrite 23: int fd; // in: fd in question 24: bool read_flag; // in: read else write 25: 26: int err; // out: 27: public: 28: // public so it can be got in felix 29: sel_param pb; // in & out: what you want, what you get (64bit len?) 30: 31: virtual ~fileio_request(); // c++ should do this automatically 32: fileio_request(); // flx linkage 33: fileio_request(int f, char* buf, long len, long off, bool rd); 34: 35: virtual void doit(); // sync 36: }; 37: 38: }} // namespace demux, flx 39: #endif // __PFILEIO__ 40:
1: #line 366 "./lpsrc/flx_posix_demux.ipk" 2: #include <stdio.h> // printf 3: #include <errno.h> // errno 4: #include "demux_pfileio.hpp" 5: 6: // blocking reads & writes that use a worker fifo. users overload 7: // finished flag to implement wakeup 8: 9: // if we could group the requests, we could do a scattered read 10: // or we could do single reads if the requests were of a similar 11: // nature, i.e. the whole file, of popular files. 12: 13: // for pwrite/pread, I'm supposed to include the following three (osx man page) 14: // they don't appear to be necessary, but let's play it safe 15: #include <sys/types.h> 16: #include <sys/uio.h> 17: #include <unistd.h> 18: 19: namespace flx { namespace demux { 20: // fileio_request stuff follows 21: 22: // read or write in a blocking fashion. I like the idea of using pread 23: // which doesn't change the file pointer. this could allow reuse of the same 24: // file descriptor & block caching 25: 26: fileio_request::~fileio_request(){} 27: fileio_request::fileio_request(){} 28: 29: fileio_request::fileio_request(int f, char* buf, long len, long off, bool rd) 30: : offset(off), fd(f), read_flag(rd), err(0) 31: { 32: pb.buffer = buf; 33: pb.buffer_size = len; 34: pb.bytes_written = 0; 35: } 36: 37: // synchronously process read/write 38: void 39: fileio_request::doit() 40: { 41: /* 42: fprintf(stderr,"faio about to try to %s %i bytes from fd=%i\n", 43: (read_flag) ? "read" : "write", pb.buffer_size, fd); 44: */ 45: 46: // switching off (explicit) seeks for now because I'm not using them 47: // in the flx code & I'm not passing around enough info (just the fd) 48: ssize_t res; 49: 50: if(read_flag) 51: { 52: // res = pread(fd, pb.buffer, pb.buffer_size, offset); 53: res = read(fd, pb.buffer, pb.buffer_size); 54: } 55: else 56: { 57: // res = pwrite(fd, pb.buffer, pb.buffer_size, offset); 58: res = write(fd, pb.buffer, pb.buffer_size); 59: } 60: 61: // zero return value indicates end of file. that should just work. 62: if(-1 == res) 63: { 64: err = errno; // grab errno 65: fprintf(stderr,"faio error: %i\n", err); 66: } 67: else 68: { 69: // fprintf(stderr,"faio %s %i bytes\n", (read_flag) ? "read" : "write", res); 70: pb.bytes_written = res; 71: } 72: } 73: }} 74:
1: #line 441 "./lpsrc/flx_posix_demux.ipk" 2: 3: #ifndef __FLX_DEMUX_SELF_PIPER_H__ 4: #define __FLX_DEMUX_SELF_PIPER_H__ 5: 6: #include <flx_demux_config.hpp> 7: #include "demux_posix_demuxer.hpp" 8: 9: namespace flx { namespace demux { 10: 11: // there's no standard posix_socketio_wakeup, could be handy. could also 12: // perhaps use it here? this is a pipe, not a socket. not sure if recv nor 13: // send work on it, besides want to read an unlimited amount of redundant data. 14: class DEMUX_EXTERN selfpipe_wakeup : public socket_wakeup { 15: public: 16: demux_callback* cb; // optional callback 17: 18: virtual void wakeup(posix_demuxer& demux); 19: }; 20: 21: class DEMUX_EXTERN auto_fd { 22: public: 23: int fd; 24: 25: auto_fd(); 26: ~auto_fd(); 27: }; 28: 29: // make portable here? make part of the wakeup obj? 30: class DEMUX_EXTERN pipe_pair { 31: // self pipe trick!!! fd[0] = read end, fd[1] = write end. 32: auto_fd fds[2]; 33: public: 34: pipe_pair(); 35: // void read_byte(); // done for us by wakeup obj. 36: void write_byte(); 37: int get_read_end(); 38: }; 39: 40: // wakes a POSIX demuxer, for when you want some kind of attention 41: // todo: make portable 42: class DEMUX_EXTERN self_piper { 43: pipe_pair pp; 44: selfpipe_wakeup spw; 45: public: 46: void install(demuxer* demux, demux_callback* cb = 0); 47: void wake(); 48: }; 49: 50: }} // namespace demux, flx 51: 52: #endif
1: #line 494 "./lpsrc/flx_posix_demux.ipk" 2: 3: #include "demux_self_piper.hpp" 4: #include <stdio.h> // printf, perror 5: #include <unistd.h> // pipe for self-pipe trick. 6: #include <assert.h> 7: 8: namespace flx { namespace demux { 9: 10: auto_fd::auto_fd() 11: { 12: fd = -1; // invalid 13: } 14: 15: auto_fd::~auto_fd() 16: { 17: if(-1 == fd) return; 18: 19: if(close(fd) == -1) 20: perror("auto fd close"); 21: } 22: 23: void 24: self_piper::install(demuxer* d, demux_callback* cb) 25: { 26: //fprintf(stderr, "installing self piper in %p with cb=%p\n", d, cb); 27: posix_demuxer* demux = static_cast<posix_demuxer*>(d); 28: spw.s = pp.get_read_end(); 29: spw.cb = cb; 30: 31: int res = demux->add_socket_wakeup(&spw, PDEMUX_READ); 32: assert(-1 != res); 33: } 34: 35: // wake the demuxer referenced in install 36: void 37: self_piper::wake() 38: { 39: // fprintf(stderr, "self_piper::wake\n"); 40: pp.write_byte(); 41: } 42: 43: void 44: selfpipe_wakeup::wakeup(posix_demuxer& demux) 45: { 46: // fprintf(stderr, "selfpipe wakeup: read the pending byte and re-arm\n"); 47: // not using the pipe pair because it doesn't know that it's part of 48: // one. not to worry. 49: ssize_t nbytes; 50: char b; 51: 52: // if this were read then this fn would work with non-sockets 53: // EH? It IS read. 54: nbytes = read(s, &b, 1); 55: 56: if(nbytes == -1) perror("read"); 57: 58: // fprintf(stderr, "GOT: %li, %x\n", nbytes, b); 59: assert(nbytes == 1 && b == 1); 60: 61: // callback! 62: if(cb) cb->callback(&demux); 63: 64: // add self back! this happens even when we're quitting, but that 65: // doesn't seem to matter. 66: // fprintf(stderr, "selfpiper rearming\n"); 67: int res = demux.add_socket_wakeup(this, PDEMUX_READ); 68: assert(-1 != res); 69: } 70: 71: pipe_pair::pipe_pair() 72: { 73: // fprintf(stderr, "creating pipe for self-pipe trick\n"); 74: 75: int self_pipe_fds[2]; 76: if(pipe(self_pipe_fds) == -1) 77: { 78: perror("ts_select_demuxer::self_pipe"); 79: throw -1; 80: } 81: 82: // fprintf(stderr, "self pipe fds: read: %i, write: %i\n", 83: // self_pipe_fds[0], self_pipe_fds[1]); 84: 85: fds[0].fd = self_pipe_fds[0]; 86: fds[1].fd = self_pipe_fds[1]; 87: } 88: 89: void 90: pipe_pair::write_byte() 91: { 92: char b = 1; 93: ssize_t nbytes; 94: // is this blocking? I guess it has to be... 95: nbytes = write(fds[1].fd, &b, 1); // wake up, jeff! 96: 97: // fprintf(stderr, "self_piper::wake write returned: %i\n", nbytes); 98: 99: if(-1 == nbytes) perror("pipe_pair::write_byte"); 100: assert(1 == nbytes); 101: } 102: 103: int 104: pipe_pair::get_read_end() 105: { 106: return fds[0].fd; 107: } 108: 109: } } 110:
1: #line 605 "./lpsrc/flx_posix_demux.ipk" 2: #ifndef __FLX_DEMUX_SOCKETY_H__ 3: #define __FLX_DEMUX_SOCKETY_H__ 4: #include <flx_demux_config.hpp> 5: namespace flx { namespace demux { 6: 7: // Shouldn't this all be DEMUX_EXTERN? eh, doesn't happen on win32 8: // we'll probably live. 9: int create_listener_socket(int* io_port, int q_len); 10: int create_async_listener(int* io_port, int q_len); 11: int nice_accept(int listener, int* err); 12: int nice_connect(const char* addr, int port); 13: int async_connect(const char* addr, int port, int* finished, int* err); 14: 15: /* handy socket building blocks */ 16: 17: int connect_sock(int s, const char* addr, int port); 18: 19: /* this could possibly do with NIC addr as well as port */ 20: int bind_sock(int s, int* io_port); 21: 22: int make_nonblock(int s); 23: int make_linger(int s, int t); 24: int set_tcp_nodelay(int s, int disable_nagle); 25: int get_socket_error(int s, int* socket_err); 26: 27: }} // namespace demux, flx 28: #endif 29:
1: #line 3 "./lpsrc/flx_epoll_demux.ipk" 2: #ifndef __FLX_DEMUX_EPOLL_DEMUXER_H__ 3: #define __FLX_DEMUX_EPOLL_DEMUXER_H__ 4: 5: #include <flx_demux_config.hpp> 6: #include "demux_posix_demuxer.hpp" 7: 8: namespace flx { namespace demux { 9: // epoll allows only one event per socket - it does not differentiate 10: // on the awaited operation (read/write), however it does let you wait 11: // on any combination (I think) 12: 13: // ******************************************************** 14: /// epoll based demuxer 15: // ******************************************************** 16: 17: class DEMUX_EXTERN epoll_demuxer : public posix_demuxer { 18: int epoll_fd; 19: 20: // be careful of this - don't let it create race conditions 21: // should probably only be called by wait = in one thread only (check) 22: // this removes ALL outstanding events for s. 23: void remove_wakeup(int s); 24: 25: virtual void get_evts(bool poll); 26: public: 27: epoll_demuxer(); 28: virtual ~epoll_demuxer(); 29: 30: virtual int add_socket_wakeup(socket_wakeup* sv, int flags); 31: }; 32: 33: }} // namespace demux, flx 34: #endif 35:
1: #line 39 "./lpsrc/flx_epoll_demux.ipk" 2: // epoll interface. does epoll support ordinary files in addition to sockets? 3: // EPOLLET to make epoll edgetriggered. I guess the default is level triggered. 4: 5: // epoll events are not one shot, in fact they're quite sticky so socket 6: // filters must be removed manually to guarantee a one-to-one wakeup 7: // to add_wakeup ratio. note that the oneshot flag is not a solution. 8: 9: // cool! EPOLLONESHOT 10: // BUGGER! doesn't seem to exist! and doing this doesn't make it so! 11: // #ifndef EPOLLONESHOT 12: // #define EPOLLONESHOT (1<<30) 13: // #endif 14: 15: #include "demux_epoll_demuxer.hpp" 16: 17: #include <sys/epoll.h> // for epoll_* 18: #include <stdio.h> // for perror 19: #include <unistd.h> // for close 20: #include <errno.h> // EEXIST, errno 21: 22: namespace flx { namespace demux { 23: 24: epoll_demuxer::epoll_demuxer() 25: : epoll_fd(-1) 26: { 27: // EPOLLONESHOT is shit, don't use it. Enabling it just means that your 28: // wakeups are suppressed and you have to use EPOLL_CTL_MOD instead 29: // of EPOLL_CTL_ADD. If it isn't defined then so much the better. 30: //#ifdef EPOLLONESHOT 31: // fprintf(stderr,"WARNING: EPOLLONESHOT AVAILABLE (%x)!!!\n", EPOLLONESHOT); 32: //#endif 33: 34: // god knows what the maximum size will be, I'll just say 1 for now 35: epoll_fd = epoll_create(1); 36: if(-1 == epoll_fd) 37: { 38: perror("epoll_create"); 39: throw -1; 40: } 41: } 42: 43: epoll_demuxer::~epoll_demuxer() 44: { 45: async_quit(); // get waiting thread to exit. 46: 47: if(-1 != epoll_fd) 48: { 49: if(close(epoll_fd) != 0) 50: perror("epoll close"); 51: } 52: } 53: 54: int 55: epoll_demuxer::add_socket_wakeup(socket_wakeup* sv, int flags) 56: { 57: int s = sv->s; 58: 59: struct epoll_event evt; 60: // fprintf(stderr,"add_socket_wakeup: %i (sv=%p, flags=%x)\n", 61: // s, sv, flags); 62: 63: // EPOLLONESHOT saves us not only a system call to remove epoll evts, 64: // which aren't intrinsically one-shot, but having to do it ourselves 65: // would have been a pain as epoll doesn't tell you which fd had the event 66: // this way we can get away with not knowing & not losing our user cookie 67: evt.events = 0; 68: 69: if(flags & PDEMUX_READ) evt.events |= EPOLLIN; 70: if(flags & PDEMUX_WRITE) evt.events |= EPOLLOUT; 71: 72: // fprintf(stderr, "flags %x -> evt.events %x\n", flags, evt.events); 73: 74: // We do the remove manually because oneshot in epoll doesn't 75: // remove the socket, but rather, disables it. 76: //#ifdef EPOLLONESHOT 77: // evt.events |= EPOLLONESHOT; // yes! 78: //#endif 79: // I think EPOLLHUP comes when the connection closes (on read?) 80: // poll's (plain old poll) equivalents to this are ignored for input 81: // same here? 82: // I get EPOLLHUPs for bad async connects whether I ask for them or not. 83: evt.events |= (EPOLLHUP | EPOLLERR); // I think I want this 84: 85: evt.data.ptr = sv; // our user data 86: 87: if(epoll_ctl(epoll_fd, EPOLL_CTL_ADD, s, &evt) == -1) 88: { 89: // EPOLL_CTL_MOD cannot help us do bidirection io on one socket, 90: // the mod will overwrite the old user cookie and direction. 91: // It seems that only kqueues, select and iocps allow that. 92: // Will need a wakeup that can do both, oneshot that indicates 93: // the available direction. 94: // when using oneshot, we're supposed to use EPOLL_CTL_MOD 95: #if 0 96: int err = errno; 97: 98: if(EEXIST == err) 99: { 100: // ok - let's try EPOLL_CTL_MOD 101: fprintf(stderr, "RETRYING WITH EPOLL_CTL_MOD\n"); 102: if(epoll_ctl(epoll_fd, EPOLL_CTL_MOD, s, &evt) != -1) 103: return 0; // good! 104: } 105: #endif 106: perror("epoll_ctl (add)"); 107: 108: return -1; 109: } 110: return 0; 111: } 112: 113: // epoll doesn't differentiate on events. I bet I could 114: // just not pass that event... 115: void 116: epoll_demuxer::remove_wakeup(int s) 117: { 118: // EPOLL_CTL_DEL uses no information from the event 119: // and so I should be able to pass NULL. 120: struct epoll_event evt; 121: // evt.events = (read) ? EPOLLIN : EPOLLOUT; 122: 123: // fprintf(stderr,"removing socket wakeup %i\n", s); 124: 125: if(epoll_ctl(epoll_fd, EPOLL_CTL_DEL, s, &evt) == -1) 126: { 127: //const char* str = (read) ? "epoll_ctl (remove read)" 128: // : "epoll_ctl (remove write)"; 129: // perror(str); 130: perror("epoll_ctl (remove)"); 131: } 132: } 133: 134: void 135: epoll_demuxer::get_evts(bool poll) 136: { 137: struct epoll_event evt; 138: 139: switch(epoll_wait(epoll_fd, &evt, 1, (poll) ? 0 : ~0)) 140: { 141: case -1: // error 142: perror("epoll_wait"); 143: // fall through 144: case 0: // no events (happens with timeout) 145: return; 146: } 147: 148: socket_wakeup* sv = (socket_wakeup*)evt.data.ptr; 149: 150: // not seeing timeouts as they're filtered by the switching. 151: // assuming that sv is good 152: // fprintf(stderr,"wakeup (sv=%p, sv->s=%i evt.events=%x)!\n", 153: // sv, sv->s, evt.events); 154: 155: // accumulate bit field of what we got 156: sv->wakeup_flags = 0; 157: 158: bool wake = false; 159: 160: // it might be possible to get both a read & write event... 161: // in which case I should take out the else below 162: if(evt.events & EPOLLIN) // I think this is how you do it 163: { 164: // fprintf(stderr,"EPOLLIN for %p\n", sv); 165: sv->wakeup_flags |= PDEMUX_READ; 166: wake = true; 167: } 168: 169: if(evt.events & EPOLLOUT) 170: { 171: //fprintf(stderr,"EPOLLOUT for %p\n", sv); 172: sv->wakeup_flags |= PDEMUX_WRITE; 173: wake = true; 174: } 175: 176: // Is this for shutdown or closing of the other end? 177: // I get it for failed async connects. I don't know if other events cause 178: // it. In any case, I don't know whether it should be for read or write, 179: // so I just don't say. In any case, it should wake to get error. 180: // I seem to get both EPOLLHUP and EPOLLERROR on bad async connect 181: // see poll demuxer notes on POLLHUP for further possibly useful info. 182: if(evt.events & EPOLLHUP) 183: { 184: fprintf(stderr, "EPOLLHUP for %p->%i\n", sv, sv->s); 185: sv->wakeup_flags |= PDEMUX_EOF; 186: wake = true; 187: } 188: 189: if(evt.events & EPOLLERR) 190: { 191: // How do I retrieve the error? 192: // There's no ambiguity - there's only ever one fd in a given epoll. 193: // If oneshot's present then don't need to do anything 194: // not sure what to do here. if we've enabled/got oneshot the socket 195: // should already have been removed 196: fprintf(stderr,"epoll error, waking: %i (errno?)\n", sv->s); 197: // similar story to EPOLLHUP 198: sv->wakeup_flags |= PDEMUX_ERROR; 199: wake = true; 200: } 201: 202: if((evt.events & ~(EPOLLERR|EPOLLIN|EPOLLOUT|EPOLLHUP))) 203: { 204: fprintf(stderr,"unknown events in epoll_demuxer %x\n", evt.events); 205: } 206: 207: // we got something. tell the people. 208: // not dependent solely on wakeup_flags - errors need to wake too. 209: if(wake) 210: { 211: // we got something. better call wakeup, must remove to guarantee 212: // 1-1 wakeups with add_sockets 213: // fprintf(stderr, "no one-shot... remove %i\n", sv->s); 214: remove_wakeup(sv->s); 215: // fprintf(stderr, "calling wakeup (flags=%x)\n", sv->wakeup_flags); 216: sv->wakeup(*this); 217: } 218: } 219: }} 220: 221:
1: #line 3 "./lpsrc/flx_evtport_demux.ipk" 2: #ifndef __FLX_DEMUX_EVTPORT_DEMUXER_H__ 3: #define __FLX_DEMUX_EVTPORT_DEMUXER_H__ 4: 5: // driver for solaris 10 event port notifications 6: 7: #include "demux_posix_demuxer.hpp" 8: 9: namespace flx { namespace demux { 10: 11: // Event ports are oneshot by default (I don't know if you can change that). 12: // Events are tracked only by fd and not fd*event, so you cannot add 13: // separate wakeups for read and write with the same fd and hope for it to 14: // work as the later one will overwrite the earlier, fodder for race 15: // conditions. This impl satisfies 1-1 wakeup to request ratio. 16: 17: // I don't know if evtports can be waited upon by other evtports 18: 19: // OBS. 20: // after removing the threads from the demuxers/event sources 21: // how are the two half demuxers supposed to work? They used to 22: // have three threads and now they have one. How can two waits be 23: // done in one thread? I could add one half_demuxer's evtport to 24: // the other's and wait on that. Would that work? Otherwise I'll 25: // have to start a thread, which screws things up a bit. Could do 26: // that and communicate back to single thread via a waitable queue. 27: // could have three half-demuxers, add them both to third and call 28: // their wait functions depending on the outer's wait result. 29: 30: class DEMUX_EXTERN evtport_demuxer : public posix_demuxer { 31: int evtport; 32: 33: // I think evtports only track socket the socket and not 34: // socket*operation, so there's only one remove 35: void remove_wakeup(int s); 36: 37: virtual void get_evts(bool poll); 38: public: 39: evtport_demuxer(); 40: virtual ~evtport_demuxer(); 41: 42: virtual int add_socket_wakeup(socket_wakeup* sv, int flags); 43: }; 44: 45: }} // namespace demux, flx 46: #endif 47:
1: #line 51 "./lpsrc/flx_evtport_demux.ipk" 2: // Evtports can get timer wakeups with PORT_SOURCE_TIMER. 3: // Can also pick up aio notifications with PORT_SOURCE_AIO. 4: 5: // looks like this stuff is only in solaris10, and not SunOS 5.8. Damn. 6: 7: #include "demux_evtport_demuxer.hpp" 8: 9: #include <port.h> 10: #include <poll.h> // POLLIN/POLLOUT 11: #include <stdio.h> // printf 12: #include <unistd.h> // close 13: #include <assert.h> 14: 15: namespace flx { namespace demux { 16: 17: // header files for this stuff? 18: // can use port_send for user defined events, to wake up reap loop 19: // truss to see what's happening 20: 21: evtport_demuxer::evtport_demuxer() 22: { 23: if((evtport = port_create()) < 0) 24: { 25: perror("port_create"); 26: throw -1; 27: } 28: } 29: 30: evtport_demuxer::~evtport_demuxer() 31: { 32: async_quit(); // gets waiting thread to quit, returning afterwards 33: 34: if(-1 != evtport) 35: { 36: if(close(evtport) != 0) 37: perror("evtport close"); 38: } 39: } 40: 41: int 42: evtport_demuxer::add_socket_wakeup(socket_wakeup* sv, int flags) 43: { 44: if(flags & ~(PDEMUX_READ|PDEMUX_WRITE)) // I can't understand you, lady! 45: return -1; 46: 47: int events = 0; // events are flags so we can accumulate them 48: int s = sv->s; 49: 50: if(flags & PDEMUX_READ) events |= POLLIN; 51: if(flags & PDEMUX_WRITE) events |= POLLOUT; 52: 53: // POLLHUP might make sense only for reads... add conditionally? 54: events |= (POLLHUP | POLLERR); 55: 56: // fprintf(stderr,"add_socket_wakeup: %i, sv: %p (%x)\n", s, sv, flags); 57: 58: // register for event we are interested in... 59: // works for files, sockets, timers... 60: // sockets are file descriptors in unix, so src is PORT_SOURCE_FD 61: if(port_associate(evtport, PORT_SOURCE_FD, (uintptr_t)s, events, sv) == -1) 62: { 63: perror("add socket_wakeup/port_associate"); 64: return -1; 65: } 66: 67: return 0; 68: } 69: 70: // note that these two functions are exactly the same 71: // we have to remove after a read or write else we can get multiple 72: // wakeups - usually with a dud user cookie. the fact that there is 73: // no differentiation between POLLIN & POLLOUT could be a problem for 74: // mixed read/write things (rare). note that evt_ports let me associate 75: // the samething twice. I don't know if this means you have to dissociate 76: // (disassociate) twice. 77: void 78: evtport_demuxer::remove_wakeup(int s) 79: { 80: if(port_dissociate(evtport, PORT_SOURCE_FD, s) == -1) 81: perror("reading port_dissociate"); 82: } 83: 84: #define POLLPR(ev) if(e->portev_events & ev) fprintf(stderr,#ev", ") 85: 86: static void 87: print_port_evt(port_event_t* e) 88: { 89: char* srcstr[PORT_SOURCE_ALERT-PORT_SOURCE_AIO+1] 90: = { "ALERT", "TIMER", "USER", "FD", "AIO"}; 91: fprintf(stderr,"e: %p\n\t", e); 92: //fprintf(stderr,"portev_events: %x\n\t", e->portev_events); 93: fprintf(stderr,"portev_events: "); 94: 95: // I got these constants from the poll.h file 96: POLLPR(POLLIN); POLLPR(POLLOUT); POLLPR(POLLPRI); 97: POLLPR(POLLRDNORM); POLLPR(POLLRDBAND); POLLPR(POLLWRBAND); 98: 99: // in poll these are in a different field. port_event_t doesn't 100: // have that field, so lets try here. 101: POLLPR(POLLERR); POLLPR(POLLHUP); POLLPR(POLLNVAL); POLLPR(POLLREMOVE); 102: 103: fprintf(stderr," (%x)\n\t", e->portev_events); 104: 105: int src = e->portev_source; 106: if(PORT_SOURCE_AIO <= src && src <= PORT_SOURCE_ALERT) 107: { 108: fprintf(stderr,"portev_source: PORT_SOURCE_%s (%x)\n\t", 109: srcstr[src-PORT_SOURCE_AIO], src); 110: } 111: else 112: { 113: fprintf(stderr,"portev_source: %x\n\t", e->portev_source); 114: } 115: 116: fprintf(stderr,"portev_pad: %x\n\t", e->portev_pad); 117: // often our socket 118: fprintf(stderr,"portev_object: %x\n\t", e->portev_object); 119: fprintf(stderr,"portev_user: %p\n", e->portev_user); 120: } 121: 122: void 123: evtport_demuxer::get_evts(bool poll) 124: { 125: // Block until a single event appears on the port. Event will not fire 126: // again, so we get max 1 wakeup per event added. 127: 128: port_event_t evt; 129: timespec timeout, *tp = NULL; 130: 131: if(poll) // effect a poll 132: { 133: timeout.tv_sec = 0; 134: timeout.tv_nsec = 0; 135: tp = &timeout; 136: } 137: 138: // wait for single event, no timeout 139: if(port_get(evtport, &evt, tp) == -1) 140: { 141: perror("port_get"); 142: return; 143: } 144: 145: // fprintf(stderr,"PORT_GET RETURNED: "); print_port_evt(&evt); 146: 147: // get wakeup obj tucked away in the user cookie. 148: socket_wakeup* sv = (socket_wakeup*)evt.portev_user; 149: int s = evt.portev_object; 150: 151: assert(sv != NULL); 152: if(evt.portev_source != PORT_SOURCE_FD) 153: { 154: // when polling I often end up in here - we get an unknown evt 155: // source and a POLLNVAL event plus lots of other unknown flags. 156: // there's interesting looking stuff in the user field and so on, 157: // but it's nothing of mine and also undocumented 158: // fprintf(stderr,"got non PORT_SOURCE_FD (s=%i, sv=%p, src=%i)\n", 159: // s, sv, evt.portev_source); 160: // fprintf(stderr, "skipping out...\n"); 161: return; 162: } 163: 164: 165: // let's see what we've got for the wakeup 166: sv->wakeup_flags = 0; 167: 168: if(evt.portev_events & POLLERR) 169: { 170: fprintf(stderr,"ERRORS on s = %i, sv = %p\n", s, sv); 171: //evt.portev_events &= ~POLLERR; 172: //return; 173: } 174: 175: // for bidirectional wakeups, we should be able to get both 176: // POLLIN and POLLOUT at the same time, but I've not yet 177: // seen it happen, they're coming in one at a time for me. 178: 179: 180: if(evt.portev_events & POLLIN) 181: { 182: // fprintf(stderr,"GOT POLLIN FOR %p\n", sv); 183: sv->wakeup_flags |= PDEMUX_READ; 184: } 185: 186: if(evt.portev_events & POLLOUT) 187: { 188: // fprintf(stderr,"GOT POLLOUT FOR %p\n", sv); 189: sv->wakeup_flags |= PDEMUX_WRITE; 190: } 191: 192: // I never asked for POLLERR, but anyway 193: if(evt.portev_events & ~(POLLIN | POLLOUT | POLLERR)) 194: { 195: fprintf(stderr,"UNSOLICITED events in evtport_demuxer (%x)\n", 196: evt.portev_events); 197: } 198: 199: assert(sv->wakeup_flags != 0); // we should've gotten SOMETHING. 200: 201: if(sv->wakeup_flags) 202: sv->wakeup(*this); 203: } 204: }} 205: 206:
1: #line 3 "./lpsrc/flx_kqueue_demux.ipk" 2: #ifndef __FLX_DEMUX_KQUEUE_DEMUXER_H__ 3: #define __FLX_DEMUX_KQUEUE_DEMUXER_H__ 4: 5: #include "demux_posix_demuxer.hpp" 6: 7: namespace flx { namespace demux { 8: 9: // ******************************************************** 10: /// kqueue demuxer for osx'n'BSD and at least 1 linux 11: // ******************************************************** 12: class DEMUX_EXTERN kqueue_demuxer : public posix_demuxer { 13: int kq; 14: protected: 15: // this could just be passed the socket_wakeup, if it stored 16: // the flags. Those flags are also set, though, which would 17: // create a race condition. In and out flags? 18: 19: int add_kqueue_filter(socket_wakeup* sv, short filter); 20: int remove_kqueue_filter(int s, short filter); 21: 22: int remove_socket_wakeup(int s, int flags); 23: void get_evts(bool poll); 24: public: 25: kqueue_demuxer(); 26: virtual ~kqueue_demuxer(); 27: 28: virtual int add_socket_wakeup(socket_wakeup* sv, int flags); 29: }; 30: 31: }} // namespace demux, flx 32: #endif 33:
1: #line 37 "./lpsrc/flx_kqueue_demux.ipk" 2: // kqueue demuxer for bsd/os x 3: // N.B. calling close on a file descriptor will remove any kevents that 4: // reference that descriptor. that would explain remove complaining from 5: // time to time. 6: // try EV_EOF to pick up eofs, useful for async file io. 7: 8: #include "demux_kqueue_demuxer.hpp" 9: 10: #include <stdio.h> // perror 11: #include <unistd.h> // close 12: 13: #include <sys/types.h> // from the kqueue manpage 14: #include <sys/event.h> // kernel events 15: #include <sys/time.h> // timespec (kevent timeout) 16: 17: // #include <sys/syscall.h> // syscall(SYC_close,kq) close workaround 18: 19: namespace flx { namespace demux { 20: kqueue_demuxer::kqueue_demuxer() 21: : kq(-1) 22: { 23: // Not that you care, but this event queue is not inherited by 24: // forked children. 25: kq = kqueue(); 26: if(-1 == kq) 27: { 28: perror("kqueue"); 29: throw -1; 30: } 31: } 32: 33: kqueue_demuxer::~kqueue_demuxer() 34: { 35: // calling close on a kq while a thread is waiting in kevent causes close 36: // to block! this happens on 10.4. Hard to say on 10.3 as close simply 37: // fails there. we need to wake the waiting thread, so we'll use that 38: // handy self pipe waker. Luckily, kqueues are responsive to new fds, 39: // otherwise we'd need the self pipe waker to be there from the start 40: // p.s. it's also bad form to destruct a demuxer while a thread waits 41: // on it. top marks to kqueues for making this obvious, passing fail 42: // to me for not applying the same to the other async demuxers. 43: async_quit(); 44: 45: //if(syscall(SYS_close, kq) == -1) 46: // I don't seem to be able to close a kq. can't fstat it either 47: if(-1 != kq && close(kq) == -1) 48: perror("kqueue close"); 49: } 50: 51: 52: // Events of interest to us ERead, EWrite. 53: // ERead has fflags: NOTE_LOWAT, NOTE_EOF. ident is a descriptor (any?) 54: 55: // if you're using the kqueue_demuxer to do a single biderectional wakeup, 56: // be aware that it currently breaks the "one shot" rule, that is you 57: // make get an unexpected wakeup the next time you call wait. 58: int 59: kqueue_demuxer::add_socket_wakeup(socket_wakeup* sv, int flags) 60: { 61: // we only know these flags 62: if((flags & ~(PDEMUX_READ | PDEMUX_WRITE))) return -1; 63: 64: // could set wakeup_flags here of what's been installed! 65: 66: // FUCK - can only do one at a time with kqueues 67: // For those doing both, if one fails, you're in a bit of trouble. 68: if(flags & PDEMUX_READ) 69: { 70: if(add_kqueue_filter(sv, EVFILT_READ) == -1) return -1; 71: } 72: 73: if(flags & PDEMUX_WRITE) 74: { 75: if(add_kqueue_filter(sv, EVFILT_WRITE) == -1) return -1; 76: } 77: 78: return 0; 79: } 80: 81: int 82: kqueue_demuxer::add_kqueue_filter(socket_wakeup* sv, short filter) 83: { 84: int s = sv->s; 85: struct kevent evt; 86: 87: // this works just like select if the s is a listening socket 88: // *except* works with all types of fds, including pipes, files & fifos 89: // can set low water mark for reads with NOTE_LOWAT in fflags and size 90: // in data. on return data contains number of bytes available to read 91: // on return sets EV_EOF in flags if read dir socket has shutdown and 92: // returns a possible socket err in fflags 93: // should that be EV_ENABLE | EV_ADD. fflags zero cos I don't know what 94: // to put there. pass pb in udata 95: 96: // adding EV_ONESHOT to save me removing on wakeup (a syscall). 97: // I now require that during the evt be removed before wakeup fn. 98: 99: EV_SET(&evt, s, filter, EV_ADD | EV_ONESHOT, 0, 0, sv); 100: // trying to detect when have reached eof with async file io using kq 101: //EV_SET(&evt, s, EVFILT_READ, EV_ADD, | EV_ONESHOT NOTE_LOWAT, 16*1024, sv); 102: 103: // add event 104: if(kevent(kq, &evt, 1, NULL, 0, NULL) < 0) 105: { 106: perror("kevent add_kqueue_filter"); 107: return -1; 108: } 109: return 0; 110: } 111: 112: // useful, but unused atm, thanks to ONESHOT. 113: // this function skirts the portability boundaries of the demux interface 114: // for kqueues each event monitor is identified (for sockets) by a pair, 115: // (s, filter), or for us, (s, {in|out}). This means that we can add 116: // individual wakeups for reading and writing but not both at once. 117: // I think epoll can do both, and so can select (and N/A to IOCPs). 118: // This "both at once" thing can't easily be one shot. There's a good 119: // case for this behaviour to be defined "undefined". Not many people 120: // using this part - could be caveat emptor... 121: int 122: kqueue_demuxer::remove_kqueue_filter(int s, short filter) 123: { 124: struct kevent evt; 125: 126: EV_SET(&evt, s, filter, EV_DELETE, 0, 0, NULL); 127: if(kevent(kq, &evt, 1, NULL, 0, NULL) < 0) 128: { 129: perror("kevent remove_socket_wakeup"); 130: return -1; 131: } 132: 133: return 0; 134: } 135: 136: int 137: kqueue_demuxer::remove_socket_wakeup(int s, int flags) 138: { 139: int r1 = 0, r2 = 0; 140: 141: if(flags & PDEMUX_READ) r1 = remove_kqueue_filter(s, EVFILT_READ); 142: if(flags & PDEMUX_WRITE) r2 = remove_kqueue_filter(s, EVFILT_WRITE); 143: 144: // If you want to know which one failed, you're out of luck. 145: if(r1 || r2) return -1; 146: 147: return 0; 148: } 149: 150: // from "advanced macos programming", on reading shutdown causes 151: // the EV_EOF flag to be set in the flags field and returns errno 152: // in the fflags field. There may still be pending data to read 153: // when EV_EOF is set. The data field says how many bytes available. 154: // for writing data says how much you can write. EV_EOF is set 155: // when the reader "disconnects". Says nothing about errno/fflags 156: // in this case. 157: /* 158: fprintf(stderr,"readevt on %i, EOF = %s\n", 159: s, (ev.flags & EV_EOF) ? "TRUE" : "FALSE"); 160: */ 161: 162: // do that thing where you get the events. can I get them one at a time? 163: // I bet I can. 164: void 165: kqueue_demuxer::get_evts(bool poll) 166: { 167: // event seems to remain unless we remove it 168: struct kevent ev; 169: int nevts; 170: 171: struct timespec timeout, *tptr = NULL; 172: 173: if(poll) 174: { 175: timeout.tv_sec = 0; // effectuate a poll 176: timeout.tv_nsec = 0; 177: tptr = &timeout; 178: } 179: 180: // timeout.tv_sec = 1; // timeout every second 181: // timeout.tv_nsec = 0; // 10^9 nanoseconds per second 182: 183: nevts = kevent(kq, NULL, 0, &ev, 1, tptr); // wait or poll 184: 185: if(nevts <= 0) 186: { 187: // error, else timeout & return to allow cancel 188: if(nevts < 0) 189: perror("kevent event fetch"); 190: 191: return; 192: } 193: 194: // fprintf(stderr,"kqueue wakeup!\n"); 195: 196: socket_wakeup* sv = (socket_wakeup*)ev.udata; 197: 198: // The filters are not bit fields, hence they must come in serially. 199: // this means you're never going to get both read and write on 200: // a kqueue_demuxer wake up. No worries. 201: if(ev.filter == EVFILT_READ) 202: { 203: // this capability is lost for the moment, as we have no way 204: // of explaining it to felix. the event stuff isn't so good right now 205: /* 206: // can chunk up on accepts. nice one kqueue 207: if(NULL == sv) // => listener 208: { 209: int backlog = (int)ev.data; 210: // fprintf(stderr,"kq listen backlog: %i\n", backlog); 211: for(int i = 0; i < backlog; i++) handle_connection(); 212: } 213: else 214: */ 215: // If a socket wakeup were a control block, you'd set the err here. 216: if(0 && ev.flags & EV_EOF) 217: { 218: // errno in fflags! 219: fprintf(stderr, 220: "got EV_EOF on read, %i bytes remain in buffer, errno=%i\n", 221: (int)ev.data, ev.fflags); 222: } 223: // fprintf(stderr,"EVFILT_READ: got %i bytes coming\n", (int)ev.data); 224: // remove_reading_fd(s); // now useing EV_ONESHOT 225: // remove other outstanding here...? 226: sv->wakeup_flags = PDEMUX_READ; // Tell 'em what they've won. 227: sv->wakeup(*this); 228: } 229: else if(ev.filter == EVFILT_WRITE) 230: { 231: // fprintf(stderr,"EVFILT_WRITE: can write (?) %i bytes\n", 232: // (int)ev.data); 233: 234: // using oneshot mode now. 235: // remove_writing_fd(s); 236: 237: if(ev.flags & EV_EOF) 238: { 239: // errno in fflags? data should be zero bytes, right? 240: // can't write anything 241: fprintf(stderr, 242: "got EV_EOF on write, data bytes =%i (0?), errno/fflags?=%i\n", 243: (int)ev.data, ev.fflags); 244: } 245: // remove other outstanding here? 246: sv->wakeup_flags = PDEMUX_WRITE; 247: sv->wakeup(*this); 248: } 249: else 250: { 251: fprintf(stderr,"unsolicited event from kqueue (%i)...\n", ev.filter); 252: // no wakeup 253: } 254: } 255: }} 256: 257:
1: #line 3 "./lpsrc/flx_poll_demux.ipk" 2: #ifndef __FLX_DEMUX_POLL_DEMUXER_H__ 3: #define __FLX_DEMUX_POLL_DEMUXER_H__ 4: 5: #include <flx_demux_config.hpp> 6: #include "demux_posix_demuxer.hpp" 7: 8: // not re-entrant 9: 10: namespace flx { namespace demux { 11: 12: class DEMUX_EXTERN poll_demuxer : public posix_demuxer { 13: void* fd_array; // make him stop! 14: void* sv_array; 15: 16: virtual void get_evts(bool poll); 17: public: 18: poll_demuxer(); 19: virtual ~poll_demuxer(); 20: 21: virtual int add_socket_wakeup(socket_wakeup* sv, int flags); 22: 23: void get_arrays(void** fds, void** svs); 24: int dopoll(void* infds, bool poll_flag); // returns nevents 25: void process_evts(void* infds, void* svs, int nevts); 26: }; 27: 28: } } 29: #endif 30:
1: #line 34 "./lpsrc/flx_poll_demux.ipk" 2: #include <stdio.h> // my friend printf 3: #include <poll.h> 4: #include <assert.h> 5: #include "demux_poll_demuxer.hpp" 6: 7: #include <vector> 8: 9: namespace flx { namespace demux { 10: 11: using namespace std; 12: 13: typedef vector<socket_wakeup*> sockvec; 14: typedef vector<struct pollfd> fdvec; 15: 16: #define FDS ((fdvec*)fd_array) 17: #define SOCS ((sockvec*)sv_array) 18: 19: // be aware that under os x 10.3 (and other systems?), poll is a thin 20: // user level layer on top of select and so of no real advantage. 21: // the select emulation doesn't seem to give POLLHUP errs, either. 22: // under 10.4 poll doesn't appear to be calling select, so I guess it's 23: // all good. 24: 25: poll_demuxer::poll_demuxer() 26: : fd_array(0), sv_array(0) 27: { 28: // fprintf(stderr, "poll_demuxer ctor\n"); 29: } 30: 31: poll_demuxer::~poll_demuxer() 32: { 33: // fprintf(stderr, "poll_demuxer dtor\n"); 34: if(SOCS) delete SOCS; 35: if(FDS) delete FDS; 36: } 37: 38: // breaking up for a thread safe impl 39: void 40: poll_demuxer::get_arrays(void** fds, void** svs) 41: { 42: *fds = fd_array; 43: *svs = sv_array; 44: 45: fd_array = 0; 46: sv_array = 0; 47: } 48: 49: // returns nevents 50: int 51: poll_demuxer::dopoll(void* infds, bool poll_flag) 52: { 53: fdvec* fds_copy = (fdvec*)infds; 54: 55: if(!fds_copy) 56: { 57: if(!poll_flag) fprintf(stderr, "Warning ::poll(\\inf) on zero fds!\n"); 58: return 0; 59: } 60: 61: struct pollfd* fds = &(*fds_copy)[0]; 62: // I sometimes end up with nothing to watch in poll mode 63: // this type doesn't seem to exist for the 10.3 emulated poll. 64: unsigned long nfds = (*fds_copy).size(); 65: // nfds_t nfds = (*fds_copy).size(); 66: int nevts; 67: 68: // fprintf(stderr, "calling ::poll with %p*%i fds\n", fds, nfds); 69: // -1 for timeout means wait indefinitely 70: nevts = ::poll(fds, nfds, (poll_flag) ? 0 : -1); 71: 72: if(-1 == nevts) 73: { 74: perror("poll_demuxer::get_evts"); 75: return 0; 76: } 77: 78: return nevts; // zero => timeout 79: } 80: 81: // processes events, calls callbacks, deletes fds & svs upon completion. 82: // Seems a bit busy, no? 83: void 84: poll_demuxer::process_evts(void* infds, void* svs, int nevts) 85: { 86: // Optimisation: when no events (due to timeout) and our fds 87: // are null (nothing changed in the meantime), just set the 88: // fd and svs members to the incoming ones and return 89: 90: // fprintf(stderr, "poll::process_evts: %i, %p\n", nevts, fd_array); 91: 92: if(0 == nevts && !fd_array) 93: { 94: // fprintf(stderr, "Optimising by resetting arrays!\n"); 95: assert( !sv_array ); // keep in sync 96: fd_array = infds; 97: sv_array = svs; 98: return; 99: } 100: 101: fdvec* fds_copy = (fdvec*)infds; 102: sockvec* socs_copy = (sockvec*)svs; 103: 104: struct pollfd* fds = &(*fds_copy)[0]; 105: unsigned long nfds = (*fds_copy).size(); 106: // nfds_t nfds = (*fds_copy).size(); 107: 108: // sanity check. looks like read and write count as 1 each 109: int evts_encountered = 0; 110: 111: // examine all fds for signs of life. try early out with nevts? 112: // for(nfds_t i = 0; i < nfds; i++, fds++) 113: for(unsigned long i = 0; i < nfds; i++, fds++) 114: { 115: // fprintf(stderr, "fds[%i]->revents=%x\n", i, fds->revents); 116: 117: socket_wakeup* sv = (*socs_copy)[i]; 118: 119: // accumulate bit field of what we got 120: // don't touch original bits, we might have to restore them 121: int wakeup_flags = 0; 122: 123: bool wake = false; 124: 125: // it might be possible to get both a read & write event... 126: // in which case I should take out the else below 127: if(fds->revents & POLLIN) // I think this is how you do it 128: { 129: // fprintf(stderr,"POLLIN for %p->%i\n", sv, sv->s); 130: wakeup_flags |= PDEMUX_READ; 131: wake = true; 132: evts_encountered++; 133: } 134: 135: if(fds->revents & POLLOUT) 136: { 137: // fprintf(stderr,"POLLOUT for %p->%i\n", sv, sv->s); 138: wakeup_flags |= PDEMUX_WRITE; 139: wake = true; 140: evts_encountered++; 141: } 142: 143: // check here for the unsolicited POLLERR, POLLHUP and POLLNVALs 144: if(fds->revents & POLLERR) 145: { 146: fprintf(stderr, "POLLERR for %p->%i\n", sv, sv->s); 147: wakeup_flags |= PDEMUX_ERROR; 148: wake = true; // good to do? 149: } 150: 151: // device has been disconnected. this and POLLOUT are mutually exclusive. 152: // a stream can never be writeable again if a hangup has occured. 153: // I've seen POLLHUPs come in for shutdown(s, 1). In this case you want 154: // the wakeup, at least if you were waiting to write. POLLHUPs also seem 155: // to be the message/wake up when reading from a connection that has 156: // closed: you get the remaining bytes, but via POLLHUP rathern POLLOUT. 157: // perhaps not worth printing, seeing as this usage is quite common 158: if(fds->revents & POLLHUP) 159: { 160: fprintf(stderr, "POLLHUP for %p->%i\n", sv, sv->s); 161: assert((fds->revents & POLLOUT) == 0); 162: wakeup_flags |= PDEMUX_EOF; 163: wake = true; // good to do? probably. 164: } 165: 166: // Invalid fd. We shouldn't ever get that. 167: if(fds->revents & POLLNVAL) 168: { 169: fprintf(stderr, "POLLNVAL for %p->%i\n", sv, sv->s); 170: wake = true; // good to do? 171: } 172: 173: if(wake) 174: { 175: // 1-1 wakeups with add_sockets 176: // be aware that callback may add back... 177: sv->wakeup_flags = wakeup_flags; 178: sv->wakeup(*this); 179: } 180: else 181: { 182: // reinstall for the next iteration. note that we keep a copy 183: // of the flags in sv->wakeup_flags, set on adding. that belongs 184: // to us so there should be no problem there. 185: //fprintf(stderr, "poll::readding: %i, %x\n", 186: // sv->s, sv->wakeup_flags); 187: if(add_socket_wakeup(sv, sv->wakeup_flags) == -1) 188: fprintf(stderr, "poll re-add finished immediately!?!\n"); 189: } 190: } 191: 192: // keep the bastards honest 193: if(evts_encountered != nevts) 194: { 195: fprintf(stderr, "poll seen/nevts mismatch: %i/%i\n", 196: evts_encountered, nevts); 197: } 198: 199: // delete all here. 200: delete fds_copy; 201: delete socs_copy; 202: } 203: 204: 205: // poll is the call, call the bool poll_flag 206: void 207: poll_demuxer::get_evts(bool poll_flag) 208: { 209: // fprintf(stderr, "poll_demuxer::get_evts\n"); 210: void *fds, *svs; 211: int nevts; 212: 213: get_arrays(&fds, &svs); // we now own them. must call process_evts 214: // to give them back. 215: 216: nevts = this->dopoll(fds, poll_flag); 217: 218: // don't shortcut based on nevts being zero - pass it on to process_evts 219: // it recongnises the optimisation opportunity uation and handles it, 220: // which has the advantage of benefiting the threadsafe version too. 221: process_evts(fds, svs, nevts); 222: } 223: 224: // precondition: not currently in get_evts (not reentrant) 225: int 226: poll_demuxer::add_socket_wakeup(socket_wakeup* sv, int flags) 227: { 228: // fprintf(stderr, "poll::add_socket_wakeup: %p->%i, %x\n", 229: // sv, sv->s, flags); 230: 231: if(!FDS) 232: { 233: // fprintf(stderr, "creating fds and svns\n"); 234: assert(SOCS == NULL); // both should be null or non null, no mix 235: 236: fd_array = new fdvec; // FDS 237: sv_array = new sockvec; // SOCS 238: } 239: 240: // add to array 241: struct pollfd fd; 242: 243: // note that we keep a copy of the flags, because we often have to 244: // copy the wakeups back that haven't had activity 245: sv->wakeup_flags = flags; 246: 247: fd.fd = sv->s; 248: fd.events = 0; 249: // set on output, but in the ambiguous case of a poll that returns due 250: // to timeout what would it be? If I can guarantee 0, then I can use the 251: // same piece of code to re-add the fds in the thread safe version. 252: fd.revents = 0; 253: 254: if(flags & PDEMUX_READ) fd.events |= POLLIN; 255: if(flags & PDEMUX_WRITE) fd.events |= POLLOUT; 256: 257: // don't bother setting POLLERR or POLLHUP. They're output (revents) only. 258: // is the same true for epoll_demuxer? I'd say so... 259: 260: // fd.revents is set on output by ::poll 261: // fprintf(stderr, "turning all revents bits on to test 0 output\n"); 262: // fd.revents = -1; 263: 264: assert(0 != fd.events); 265: 266: // add to array along with sv pointer 267: FDS->push_back(fd); 268: SOCS->push_back(sv); 269: 270: return 0; // there'll be a wakeup 271: } 272: 273: } } 274:
1: #line 309 "./lpsrc/flx_poll_demux.ipk" 2: #ifndef __FLX_DEMUX_TS_POLL_DEMUXER_H__ 3: #define __FLX_DEMUX_TS_POLL_DEMUXER_H__ 4: 5: // thread safe version of poll_demuxer 6: 7: #include "demux_poll_demuxer.hpp" 8: #include "demux_self_piper.hpp" // self pipe trick 9: #include "pthread_mutex.hpp" 10: 11: namespace flx { namespace demux { 12: 13: class ts_poll_demuxer : public posix_demuxer { 14: // lock 15: flx::pthread::flx_mutex_t ham_fist; 16: // protects this little fella here. 17: poll_demuxer demux; 18: 19: self_piper sp; 20: protected: 21: virtual void get_evts(bool poll); 22: public: 23: ts_poll_demuxer(); 24: ~ts_poll_demuxer(); 25: 26: virtual int add_socket_wakeup(socket_wakeup* sv, int flags); 27: 28: // pass it on to composed non-threadsafe demuxer 29: virtual demux_quit_flag* get_quit_flag() { return demux.get_quit_flag(); } 30: virtual void set_quit_flag(demux_quit_flag* f) { demux.set_quit_flag(f); } 31: }; 32: 33: }} // namespace demux, flx 34: 35: #endif 36:
1: #line 346 "./lpsrc/flx_poll_demux.ipk" 2: #include "demux_ts_poll_demuxer.hpp" 3: #include <stdio.h> 4: 5: namespace flx { namespace demux { 6: 7: ts_poll_demuxer::ts_poll_demuxer() 8: { 9: //fprintf(stderr, "ts_poll_demuxer installing self-piper\n"); 10: // install self pipe trick. 11: sp.install(&demux); 12: } 13: 14: ts_poll_demuxer::~ts_poll_demuxer() 15: { 16: //fprintf(stderr, "ts_polling asking thread to quit\n"); 17: async_quit(); // get async waiting thread to stop waiting 18: //fprintf(stderr, "ts_poll async quit finished\n"); 19: } 20: 21: void 22: ts_poll_demuxer::get_evts(bool poll) 23: { 24: void *fds, *svs; 25: 26: // copy args under lock 27: { 28: flx::pthread::flx_mutex_locker_t locker(ham_fist); 29: demux.get_arrays(&fds, &svs); // the arrays are now mine 30: // lock released 31: } 32: 33: // do the poll 34: int nevts = demux.dopoll(fds, poll); 35: 36: // regardless of the number of events, I have to copy the pieces back 37: // under lock tutelage 38: { 39: flx::pthread::flx_mutex_locker_t locker(ham_fist); 40: demux.process_evts(fds, svs, nevts); 41: // lock released 42: } 43: } 44: 45: // thread safe overloaded functions follow. all acquire the lock 46: // then call the unthreadsafe version of the function. nice! 47: int 48: ts_poll_demuxer::add_socket_wakeup(socket_wakeup* sv, int flags) 49: { 50: // fprintf(stderr, "ts_poll::add_sock(%i, %x)\n", sv->s, flags); 51: flx::pthread::flx_mutex_locker_t locker(ham_fist); 52: 53: int res = demux.add_socket_wakeup(sv, flags); 54: // I wouldn't touch the sv after this. 55: 56: if(-1 != res) sp.wake(); 57: 58: return res; 59: } 60: }} 61: 62:
1: #line 3 "./lpsrc/flx_select_demux.ipk" 2: #ifndef __FLX_DEMUX_SELECT_DEMUXER_H__ 3: #define __FLX_DEMUX_SELECT_DEMUXER_H__ 4: 5: #include "demux_posix_demuxer.hpp" 6: #include <sys/types.h> // for sys/select.h on osx 7: #include <sys/select.h> // for fd_set 8: #include <sys/time.h> // GUSI WTF? 9: #include <unistd.h> // for bsd 10: 11: // Unlike the other demuxers, this one is NOT thread safe, so wait and 12: // add socket wakeup must only be called from the same thread. 13: // if you're looking for the thread safe version, try ts_select_demuxer 14: 15: namespace flx { namespace demux { 16: 17: class DEMUX_EXTERN select_demuxer : public posix_demuxer { 18: void remove_fd(int s); 19: 20: // thanks Beej! 21: fd_set master_read_set; // fd watched for reading 22: fd_set master_write_set; // for writing 23: fd_set master_except_set; // for exceptions 24: 25: // read sveglias - note we only have one set, so currently this demuxer 26: // cannot have separate wakeups for the same file descriptor. this 27: // fits in fine with the "undefined" nature of doing that. 28: socket_wakeup* svs[FD_SETSIZE]; // read sveglias 29: //socket_wakeup* write_svs[FD_SETSIZE]; // write wakeups 30: 31: int fdmax; // high watermark for select 32: 33: protected: 34: virtual void get_evts(bool poll); 35: 36: public: 37: // get_evts broken into pieces for thread safe implementations 38: void copy_sets(fd_set& rset, fd_set& wset, fd_set& exset); 39: // returns true if process_sets should be called. 40: bool select(fd_set& rset, fd_set& wset, fd_set& exset, bool poll); 41: // these could be consts 42: void process_sets(fd_set& rset, fd_set& wset, fd_set& exset); 43: 44: select_demuxer(); 45: 46: virtual int add_socket_wakeup(socket_wakeup* sv, int flags); 47: }; 48: }} // namespace demux, flx 49: #endif 50:
1: #line 54 "./lpsrc/flx_select_demux.ipk" 2: #ifndef __FLX_DEMUX_TS_SELECT_DEMUXER_H__ 3: #define __FLX_DEMUX_TS_SELECT_DEMUXER_H__ 4: 5: #include "demux_select_demuxer.hpp" 6: #include "demux_self_piper.hpp" 7: #include "pthread_mutex.hpp" 8: 9: namespace flx { namespace demux { 10: 11: // thread safe version of select demuxer 12: 13: class DEMUX_EXTERN ts_select_demuxer : public posix_demuxer { 14: // lock 15: flx::pthread::flx_mutex_t ham_fist; 16: // protects this little fella here. 17: select_demuxer demux; 18: 19: // self pipe trick for waking waiting thread when we like. 20: // for demuxer responsiveness. 21: self_piper sp; 22: protected: 23: virtual void get_evts(bool poll); 24: public: 25: ts_select_demuxer(); 26: ~ts_select_demuxer(); 27: 28: virtual int add_socket_wakeup(socket_wakeup* sv, int flags); 29: 30: // pass it on to composed non-threadsafe demuxer 31: virtual demux_quit_flag* get_quit_flag() { return demux.get_quit_flag(); } 32: virtual void set_quit_flag(demux_quit_flag* f) { demux.set_quit_flag(f); } 33: }; 34: }} // namespace demux, flx 35: 36: #endif 37:
1: #line 92 "./lpsrc/flx_select_demux.ipk" 2: // P.S. for current impl don't need the pthreads. WHOO!!! 3: 4: // A very light wrapper around select, that allows the addition 5: // of new sockets and returns status in a queue. 6: // on the powerbook with 10.3, FD_SETSIZE is 1024, that means 7: // max 1024 sockets. That's kind of lame. See IO completion ports 8: // on NT for a better solution. 9: 10: // See ACE_Handle_Set_Iterator for an optimised seelect bit field examination 11: // algorithm (p149 C++ network programming, volume1) 12: 13: // see epoll, kqueue & IOCPs 14: 15: // is select level triggered? I think it is. 16: // strangely, I'm never seeing anything from the exception set. 17: 18: #include "demux_select_demuxer.hpp" 19: 20: #include <assert.h> 21: #include <string.h> // memset 22: 23: #include <stdio.h> // printf debug 24: #include <stdlib.h> 25: #include "demux_sockety.hpp" // get_socket_error 26: #include <memory> 27: #include <sys/time.h> 28: 29: namespace flx { namespace demux { 30: 31: select_demuxer::select_demuxer() 32: { 33: // clear these guys. after the thread starts, access to them will have 34: // to be via the lock 35: FD_ZERO(&master_read_set); 36: FD_ZERO(&master_write_set); 37: FD_ZERO(&master_except_set); 38: fdmax = 0; // corresponds to stdin, which we're not using 39: 40: // clear this possibly quite large list 41: //memset(svs, 0, sizeof(svs)); 42: //JS: memset must not be used except for raw data or chars 43: std::uninitialized_fill_n(svs,FD_SETSIZE,(socket_wakeup*)0); 44: } 45: 46: // one select, must not block indefinitely, so choose a timeslice 47: // or find a way to make it wake on command, like a dummy socket 48: void 49: select_demuxer::get_evts(bool poll) 50: { 51: // to use select we must copy our arguments, as it changes them! 52: // this code has been broken up in to pieces so that I can implement 53: 54: fd_set read_set, write_set, except_set; 55: 56: copy_sets(read_set, write_set, except_set); 57: 58: if(select(read_set, write_set, except_set, poll)) 59: process_sets(read_set, write_set, except_set); 60: } 61: 62: int 63: select_demuxer::add_socket_wakeup(socket_wakeup* sv, int flags) 64: { 65: int s = sv->s; 66: 67: // fprintf(stderr, "adding select wakeup for %i, flags=%x\n", s, flags); 68: 69: if(s < 0 || s >= FD_SETSIZE) return -1; // weakness of select 70: 71: assert(svs[s] == NULL); // sanity check: nothing there 72: 73: if(flags & PDEMUX_READ) FD_SET(s, &master_read_set); 74: 75: if(flags & PDEMUX_WRITE) FD_SET(s, &master_write_set); 76: 77: // does this mean we could add a non-reading, non-writing socket 78: // and wait for errors on it? 79: FD_SET(s, &master_except_set); 80: 81: svs[s] = sv; // record wakeup. ours now. 82: 83: 84: if(s > fdmax) fdmax = s; // update highwater mark 85: 86: return 0; 87: } 88: 89: // removes for both reading AND writing. 90: void 91: select_demuxer::remove_fd(int s) 92: { 93: // fprintf(stderr, "removing select fd: %i\n", s); 94: 95: assert(s >= 0 && s < FD_SETSIZE); 96: assert(svs[s] != NULL); // there should be something there 97: 98: // clear them all regardless. 99: FD_CLR(s, &master_read_set); 100: FD_CLR(s, &master_write_set); 101: FD_CLR(s, &master_except_set); 102: 103: svs[s] = NULL; 104: } 105: 106: // virtual functions to be overridden for thread safe descendent 107: void 108: select_demuxer::copy_sets(fd_set& rset, fd_set& wset, fd_set& exset) 109: { 110: rset = master_read_set; 111: wset = master_write_set; 112: exset = master_except_set; 113: } 114: 115: bool 116: select_demuxer::select(fd_set& rset, fd_set& wset, fd_set& exset, bool poll) 117: { 118: // this is depending on my fake socket to wakeup. perhaps use the timer 119: // for now. 120: struct timeval tv, *tp = NULL; 121: 122: if(poll) 123: { 124: tv.tv_sec = 0; 125: tv.tv_usec = 0; 126: tp = &tv; 127: } 128: 129: // the return value here actually has significance 130: // sometimes I have to try again, or weed out bad fds. 131: //if(select(fdmax+1, &read_set, &write_set, &except_set, &tv) == -1) 132: // nah! wait forever. none of these things shutdown properly yet. 133: // it'll force the async new wakeup responsiveness 134: switch(::select(fdmax+1, &rset, &wset, &exset, tp)) 135: { 136: case 0: 137: return false; // timed out, don't process sets 138: break; 139: case -1: 140: // not the ideal reaction. I think this is where I weed out 141: // the bad socket(s). would need error set. 142: 143: // closing a socket without removing can get us here. that's pretty 144: // nasty, because our data would be stale. Try not to do that. I 145: // wonder if the except set would tell us when the socket was 146: // closed on us? Damn, you have to clear it, else you keep getting 147: // the same error. 148: perror("select"); 149: // fall through and examine except set 150: break; 151: } 152: return true; // call process_sets 153: } 154: 155: void 156: select_demuxer::process_sets(fd_set& rset, fd_set& wset, fd_set& exset) 157: { 158: // since we're about to traverse the socket sets anyway, we should 159: // note the highest fd seen, and make that the highwater mark. 160: // that way we wouldn't be guaranteed monotonically degrading performance. 161: 162: // might be worth keeping a low water mark as well. 163: // I guess this is why select sucks. On osx we can only watch 164: // about 1024 sockets. That sucks too. could allocate larger sets 165: // with malloc... see c++ network programming book. 166: 167: // like kqueues, this code could theoretically handle separate wakeups 168: // for read and write, should I do it? not right now. 169: int new_fdmax = 0; 170: 171: for(int i = 0; i <= fdmax; i++) 172: { 173: int flags = 0; 174: 175: if(FD_ISSET(i, &rset)) flags |= PDEMUX_READ; 176: 177: if(FD_ISSET(i, &wset)) flags |= PDEMUX_WRITE; 178: 179: // sorta suggests that I ought to call the wakeup and pass 180: // an error flag on to it. 181: if(FD_ISSET(i, &exset)) 182: { 183: // don't remove bad sockets - it's an error to close the socket 184: // or deallocate the wakeup without telling the source. when 185: // we get socket errors, we'd better hope that there's reading 186: // or writing to be done. 187: // under cygwin, closing down a socket (read, write or both) 188: // causes select to wake up with an exception bit. out of cygwin 189: // we only wake up. In both cases, the read bit is set so 190: // just handling the stuff seems to work. not sure about write. 191: // posix_demuxer::socket_recv thinks the connection's closed, but 192: // it all seems to work out. Yours, Confused. 193: 194: fprintf(stderr, "select error on socket %i, flags=%x\n", 195: i, flags); 196: 197: int err; 198: // heh heh, this isn't great to call on the pipe that is used 199: // in the self pipe trick. I don't know why it's getting an 200: // err anyway. 201: if(get_socket_error(i, &err) == -1) 202: fprintf(stderr, "get_socket_error failed!?!\n"); 203: 204: fprintf(stderr, "socket err = %i, %s\n", err, strerror(err)); 205: // don't remove! see below 206: // remove_fd(i); 207: } 208: 209: // 210: if(flags) 211: { 212: socket_wakeup* sv = svs[i]; 213: // remove before wakeup so wakeup can add itself back, 214: // if necessary. 215: remove_fd(i); 216: 217: sv->wakeup_flags = flags; 218: sv->wakeup(*this); 219: } 220: 221: // to lower high-watermark, keep track of highest seen. 222: if(svs[i]) new_fdmax = i; 223: } 224: 225: // fprintf(stderr, "new_fdmax=%i, fdmax=%i\n", new_fdmax, fdmax); 226: 227: fdmax = new_fdmax; // copy it back 228: } 229: 230: }} // flx, demux 231:
1: #line 324 "./lpsrc/flx_select_demux.ipk" 2: #include "demux_ts_select_demuxer.hpp" 3: 4: // #include <stdio.h> 5: 6: namespace flx { namespace demux { 7: 8: ts_select_demuxer::ts_select_demuxer() 9: { 10: // fprintf(stderr, "creating pipe for self-pipe trick\n"); 11: // install self pipe trick 12: sp.install(&demux); 13: } 14: 15: ts_select_demuxer::~ts_select_demuxer() 16: { 17: async_quit(); // wake thread, ask to leave. 18: } 19: 20: void 21: ts_select_demuxer::get_evts(bool poll) 22: { 23: fd_set rset, wset, exset; 24: 25: // copy args under lock 26: { 27: flx::pthread::flx_mutex_locker_t locker(ham_fist); 28: demux.copy_sets(rset, wset, exset); 29: } 30: 31: // process arg set under lock. note that the select demuxer is passed 32: // to wakeups, so no recursive lock is needed. also, because any 33: // readditions caused by the callback are done to the naive demuxer, 34: // no selfpipe writes are required either. the only thing to remember 35: // is that the wakeup recipient should not be surprised to see a demuxer 36: // in its callback different to the one it originally added to. 37: if(demux.select(rset, wset, exset, poll)) 38: { 39: flx::pthread::flx_mutex_locker_t locker(ham_fist); 40: demux.process_sets(rset, wset, exset); 41: } 42: } 43: 44: // thread safe overloaded functions follow. all acquire the lock 45: // then call the unthreadsafe version of the function. nice! 46: int 47: ts_select_demuxer::add_socket_wakeup(socket_wakeup* sv, int flags) 48: { 49: // fprintf(stderr, "ts_select::add: %p->%i (%s), %x\n", 50: // sv, s, (s == self_pipe_fds[0]) ? "self pipe" : "socket", flags); 51: flx::pthread::flx_mutex_locker_t locker(ham_fist); 52: 53: int res = demux.add_socket_wakeup(sv, flags); 54: // I wouldn't touch the sv after this. 55: 56: // we have a new socket, so wake the event waiting thread 57: // for great responsiveness. 58: if(-1 != res) sp.wake(); 59: 60: return res; 61: 62: // we need to wake a blocking get_evts call here else we'll have bad 63: // performance or even a lockup. the question is do we need to do it under 64: // the tutelage of the lock? 65: } 66: 67: }} 68: 69:
1: #line 3 "./lpsrc/flx_iocp_demux.ipk" 2: #ifndef __FLX_DEMUX_IOCP_DEMUXER_H__ 3: #define __FLX_DEMUX_IOCP_DEMUXER_H__ 4: 5: #include <flx_demux_config.hpp> 6: //#include <Windows.h> 7: // be specific - flx_rtl_config.h now jigs it so that windows.h does not 8: // include winsock version 1 headers by default. this was making the order 9: // of inclusion of windows.h and winsock2.h significant with cl.exe. 10: #include <WinSock2.h> 11: 12: #include "demux_demuxer.hpp" 13: #include "pthread_sleep_queue.hpp" 14: 15: 16: namespace flx { namespace demux { 17: 18: // not here? returns INVALID_SOCKET on failure. 19: // if *io_port == 0, then a port is chosen and returned in *io_port 20: SOCKET DEMUX_EXTERN create_listener_socket(int* io_port, int backlog); 21: // these two probably not used. move to wsockety.h 22: SOCKET DEMUX_EXTERN nice_accept(SOCKET listener); 23: SOCKET DEMUX_EXTERN nice_connect(const char* addr, int port); 24: int DEMUX_EXTERN set_tcp_nodelay(int s, int disable_nagle); 25: 26: // ******************************************************** 27: /// make sure you instantion ONE (1) of these before using winsock 28: // ******************************************************** 29: class DEMUX_EXTERN winsock_initer 30: { 31: public: 32: winsock_initer(); 33: ~winsock_initer(); 34: }; 35: 36: // ******************************************************** 37: /// iocp_wakeup base class for users of iocp_demuxer 38: /// becoming an overlapped call control block 39: // ******************************************************** 40: class DEMUX_EXTERN iocp_wakeup { 41: protected: // folks need to use these in win 32 calls 42: OVERLAPPED ol; 43: // store wakeup error here? 44: // I didn't want this to be felixy, useful though. 45: void clear_overlapped(); // zero the OVERLAPPED structure 46: public: 47: // 2 possibilities for piggybacking data. who could ask for more? 48: // udat = per iocp association, olp = per overlapped function call. 49: // why don't I need this in the posix version? 50: virtual void iocp_op_finished( DWORD nbytes, ULONG_PTR udat, 51: LPOVERLAPPED olp, int err) = 0; 52: 53: // start overlapped async operation. returns true if it finished 54: // immediately. in this case there will be no iocp_op_finished wakeup. 55: // assumes all args ready for call. 56: virtual bool start_overlapped() = 0; 57: 58: // retrieves this pointer from OVERLAPPED pointer 59: static iocp_wakeup* from_overlapped(LPOVERLAPPED olp); 60: }; 61: 62: // ******************************************************** 63: // ******************************************************** 64: class DEMUX_EXTERN iocp_demuxer : public demuxer { 65: HANDLE iocp; // the io completion queue 66: 67: void get_evts(bool poll); 68: public: 69: iocp_demuxer(); 70: virtual ~iocp_demuxer(); 71: 72: // udat is the per IOCP object user cookie & the overlapped pointer 73: // is the per overlapped operation cookie (sort of), so in the case 74: // of acceptex, udat is set when the listener is associated with the 75: // iocp and is passed to the subsequent acceptex iocp wakeups. 76: // probably won't be used very often 77: // the OVERLAPPED retrieved from the iocp is assumed to be part of 78: // an iocp_wakeup - beware! returns 0 on success, -1 on failure. 79: int associate_with_iocp(HANDLE obj, ULONG_PTR udat); 80: 81: }; 82: 83: }} // namespace demux, flx 84: #endif 85: 86:
1: #line 90 "./lpsrc/flx_iocp_demux.ipk" 2: #include "demux_iocp_demuxer.hpp" 3: #include "demux_quitter.hpp" // for clean threaded iocp takedown 4: 5: #include <stdio.h> // for printf debugging 6: #include <stddef.h> // offsetof 7: #include <assert.h> 8: // shoving the win_queue in here for now 9: 10: namespace flx { namespace demux { 11: 12: // this could really do with auto objs. steal the strat stuff? 13: 14: // add windows error processing macros. It's a bore otherwise. 15: 16: // WaitForSingleObject on an kill event in the thread for thread cancel 17: // kill_event = CreateEvent(NULL, TRUE, FALSE, NULL); (what's that) 18: // SetEvent(kill_event) to invoke (?): SetEvent sets the event to the 19: // signalled state. Return value is success flag. GetLastError. 20: 21: // do auto SOCKET wrapper, check closesocket return code. 22: 23: // a completion port is a queue into which the os puts notifications of 24: // completed overlapped io requests. once the operation completes, a 25: // notification is sent to a worker thread that can process the result. 26: // a socket may be associated with a completion port at any point after 27: // creation. 28: 29: 30: // I don't see how to nicely stop a thread, I may have to have my own protocol 31: // to ask it to exit. 32: 33: // PostQueuedCompletionStatus can be used by threads to wake up a worker 34: // thread. Could be handy replacement for timeout. "useful for notifying 35: // worker threads of external events" 36: 37: // working through this: http://msdn.microsoft.com/msdnmag/issues/1000/Winsock/ 38: // example of worker thread here 39: // http://msdn.microsoft.com/library/default.asp?url=/library/en-us/msmq/msmq_using_reading_msg_98j7.asp 40: // http://msdn.microsoft.com/library/default.asp?url=/library/en-us/fileio/fs/i_o_completion_ports.asp 41: // nono, use this onec 42: // http://msdn.microsoft.com/library/default.asp?url=/library/en-us/msmq/msmq_using_reading_msg_98j7.asp 43: // oh, wait they're the same 44: // FormatMessge 45: 46: winsock_initer::winsock_initer() 47: { 48: WSADATA wsaData; 49: 50: // strange, rt_faio_01 static doesn't work under vs without this. 51: // hum. 52: // fprintf(stderr, "WINSOCK INIT!!!\n"); 53: // apparently 2.2's the way to go 54: int res= WSAStartup(MAKEWORD(2, 2), &wsaData); 55: if(res!= 0) 56: { 57: //JS: WSAGetLastError CANNOT be called, since WSAStartup failed 58: fprintf(stderr,"couldn't find usable winsock dll: %i\n", res); 59: throw res; 60: } 61: } 62: 63: winsock_initer::~winsock_initer() 64: { 65: if(WSACleanup() != 0) 66: { 67: fprintf(stderr,"WSACleanup failed %i\n", WSAGetLastError()); 68: } 69: } 70: 71: // iocp_wakeup base class for users of iocp_demuxer 72: //static 73: iocp_wakeup* 74: iocp_wakeup::from_overlapped(LPOVERLAPPED olp) 75: { 76: // calculate the address of this from overlapped member 77: // suffer an obligatory offsestof warning from broken gccs. 78: return (iocp_wakeup*)((char*)olp-offsetof(iocp_wakeup, ol)); 79: } 80: 81: void 82: iocp_wakeup::clear_overlapped() 83: { 84: ZeroMemory(&ol, sizeof(ol)); // much better than memset, right? 85: } 86: 87: 88: iocp_demuxer::iocp_demuxer() 89: : iocp(NULL) 90: { 91: // Create the completion port 92: // not sure what first 3 args do, but by specifying INVALID_HANDLE_VALUE 93: // for the first I think I can ignore the rest (apart from the last, numthreads) 94: // I still have to create the threads, but only NumberOfConcurrentThreads 95: // will wake up from GetQueuedCompletionStatus at a time. This looks to be 96: // slightly elastic... 97: // NT 3.51 doesn't let you pass null filehandle, you've got to have a dummy 98: // socket. keep that in mind. see InitializeIOCP in IOCPServer.cpp example 99: // taken from codeproject. GetSystemInfo to find out num CPUs 100: //fprintf(stderr,"CreateIoCompletionPort with ONE WORKER THREAD\n"); 101: iocp = CreateIoCompletionPort( 102: INVALID_HANDLE_VALUE, 103: NULL, 104: (ULONG_PTR)0, 105: 1 // 1 thread (zero means one for each CPU) 106: ); 107: 108: if(NULL == iocp) 109: { 110: DWORD err = GetLastError(); 111: fprintf(stderr,"failed to create completion port: %li\n", err); 112: throw -1; 113: } 114: } 115: 116: iocp_demuxer::~iocp_demuxer() 117: { 118: //fprintf(stderr, "~iocp (%p) NOW WITH ASYNC QUITTER!\n", iocp); 119: try 120: { 121: demux_quitter q; 122: q.quit(this); 123: } 124: catch(...) 125: { 126: fprintf(stderr, "~iocp_demuxer async quit threw exception!\n"); 127: // now what do we do? 128: } 129: 130: 131: if(NULL != iocp && !CloseHandle(iocp)) 132: { 133: DWORD err = GetLastError(); 134: fprintf(stderr,"failed cleanup iocp: %li\n", err); 135: } 136: } 137: 138: int 139: iocp_demuxer::associate_with_iocp(HANDLE obj, ULONG_PTR udat) 140: { 141: // fprintf(stderr, "associating with iocp=%p: %p, udat: %lx\n", 142: // iocp, obj, udat); 143: 144: // Any overlapped operations performed on the object will use the 145: // completion port for notification. The 3rd param can be used to pass 146: // per object context information. we'll just pass that back. 147: if(CreateIoCompletionPort(obj, iocp, udat, 0) == NULL) { 148: // adding the same obj twice without an intervening get completion 149: // status wakup gets an error 87, ERROR_INVALID_PARAMETER 150: fprintf(stderr,"CreateIoCompletionPort failed to register object: %li\n", 151: GetLastError()); 152: return -1; 153: } 154: 155: return 0; 156: } 157: 158: void 159: iocp_demuxer::get_evts(bool poll) { 160: // with multiple threads, this will actually wake up the last to 161: // block (lifo) 162: 163: // get context, call worker_thread 164: // need to be able to tell which thing completed, can have extra data 165: // following some kind of struct 166: // get this pointer 167: 168: // I guess to avoid swapping of thread context. By calling this on a given 169: // completion port this thread is associated with it until exit or respec 170: DWORD nbytes; // number of bytes in io transaction 171: ULONG_PTR udat; // user data - not using this atm 172: LPOVERLAPPED olp; // we get iocp_wakeup from this. 173: 174: // If a socket handle associated with a completion port is closed, 175: // GetQueuedCompletionStatus returns ERROR_SUCCESS, with *lpOverlapped 176: // non-NULL and lpNumberOfBytes equal zero. 177: 178: int err = NO_ERROR; 179: 180: // No timeout. What does false mean? Eh. Could need a timeout to bring 181: // the thread down. 182: if(!GetQueuedCompletionStatus(iocp, &nbytes, &udat, &olp, 183: (poll) ? 0: INFINITE)) 184: { 185: // That's strange - I sometimes get my ConnectEx errors popping 186: // out here (ERROR_SEM_TIMEOUT=121, ERROR_CONNECTION_REFUSED=1225) 187: // it looks like my args (overlapped, etc) are still filled out, so 188: // I can still awake the sleeper 189: err = GetLastError(); // doco says this & not WSALastError. 190: 191: // let's see: yep - there's my overlapped 192: // fprintf(stderr,"!iocp::wait: nbytes=%li, udat=%lx, io=%p, err=%i\n", 193: // nbytes, udat, olp, err); 194: 195: if(WAIT_TIMEOUT == err) 196: { 197: // we get this a lot now that we can poll the iocp, so no output 198: // interestingly, nbytes = 1. what could that mean? 199: return; // no wakeup 200: } 201: else if(ERROR_OPERATION_ABORTED == err) 202: { 203: // that's real bad manners. Or I could just ignore it. Anyway, 204: // any overlapped received is stale. 205: fprintf(stderr, "WHOA!!! - disassociate before killing handle\n"); 206: return; // no wakeup 207: } 208: else 209: { 210: fprintf(stderr,"GetQueuedCompletionStatus returned false: %i\n", 211: err); 212: // return here? relying on olp being NULL, to stop us dereffing 213: } 214: 215: // I'm going to assume that there's a good wakeup, and fall through 216: // We need to wakeup on some errors (like ERROR_CONNECTION_REFUSED) 217: // FALL THROUGH 218: } 219: 220: // An IOCP is a very general event mechanism. It tells you not only about 221: // the completion of reads & writes, but also of pretty much any asynchronous 222: // event's completion. It doesn't quite fit in with my select style interfaces. 223: // I've got general overlapped things completing here. I don't want them to 224: // know about demuxers & so forth so I'll have to know about them. 225: 226: //fprintf(stderr,"HOLEY! Woke up!\n"); 227: //fprintf(stderr,"nbytes=%li, udat=%lx, olp=%p, err=%i\n", 228: // nbytes, udat, olp, err); 229: 230: // with polling it's normal not to get an overlapped pointer, because 231: // we may simply have timed out 232: assert( olp ); 233: 234: // tell someone that some overlapped op finished 235: iocp_wakeup* wakeup = iocp_wakeup::from_overlapped(olp); 236: 237: // passing olp may be redundant, seeing as it's contained in iocp_wakeup 238: wakeup->iocp_op_finished(nbytes, udat, olp, err); 239: } 240: 241: 242: 243: // simple utility fn, shouldn't be here. creates listener on any interface. 244: // this could benifit from a SOCKET class. in failure returns INVALID_SOCKET 245: // CURRENTLY EATS ERROR, SO DON'T BOTHER CHECKING 246: SOCKET 247: create_listener_socket(int* io_port, int backlog) 248: { 249: fprintf(stderr,"creating_listener_socket\n"); 250: SOCKET listener; 251: 252: // could use WSASocket, but these seem to be turning out overlapped anyway 253: // at least after tangling with overlapped functions. 254: // socket returns INVALID_SOCKET on failure. 255: listener = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); 256: 257: if (INVALID_SOCKET == listener) 258: { 259: fprintf(stderr,"listener create failed: %i\n", WSAGetLastError()); 260: return INVALID_SOCKET; 261: } 262: 263: SOCKADDR_IN addr; 264: 265: // msdn code examples don't zero the sockaddr. That makes me nervous. 266: ZeroMemory(&addr, sizeof(addr)); 267: addr.sin_family = AF_INET; 268: addr.sin_addr.s_addr = htonl(INADDR_ANY); 269: addr.sin_port = htons(*io_port); 270: 271: // bind our name to the socket 272: int res; 273: res = bind(listener, (LPSOCKADDR)&addr, sizeof(addr)); 274: 275: if (SOCKET_ERROR == res) 276: { 277: fprintf(stderr,"bind() failed %i\n", WSAGetLastError()); 278: if(closesocket(listener) == SOCKET_ERROR) 279: { 280: fprintf(stderr,"closesocket failed on listener: %i\n", 281: WSAGetLastError()); 282: } 283: return INVALID_SOCKET; 284: } 285: 286: // if user wanted port chosen tell them what it turned out to be 287: if(0 == *io_port) 288: { 289: int namelen = sizeof(addr); 290: if (getsockname(listener, (struct sockaddr *)&addr, &namelen) 291: == SOCKET_ERROR) 292: { 293: fprintf(stderr, "getsockname failed (%i)\n", WSAGetLastError()); 294: 295: if(closesocket(listener) == SOCKET_ERROR) 296: { 297: fprintf(stderr,"closesocket failed on listener: %i\n", 298: WSAGetLastError()); 299: } 300: return INVALID_SOCKET; 301: } 302: 303: *io_port = ntohs(addr.sin_port); 304: } 305: 306: // Set the socket to listen 307: res = listen(listener, backlog); 308: if (SOCKET_ERROR == res) 309: { 310: fprintf(stderr,"listen() failed %i\n", WSAGetLastError()); 311: 312: if(closesocket(listener) == SOCKET_ERROR) 313: { 314: fprintf(stderr,"closesocket failed on listener: %i\n", 315: WSAGetLastError()); 316: } 317: 318: return INVALID_SOCKET; 319: } 320: 321: return listener; 322: } 323: 324: // currently the following aren't used. Look forward to warnings 325: // about them. 326: 327: // the posix version of this made the socket nonblocking. 328: // I don't seem to have to do that when using iocp. if you 329: // want to create a nonblocking socket (or overlapped) pass 330: // WSA_FLAG_OVERLAPPED to WSASocket. I've never had to 331: // actually do this. How do you make accept do this? (supposing 332: // you wanted to) WSAAccept doesn't have a flag for it (however 333: // it does let you do conditional accepting). 334: // There doesn't seem to be a sockopt 335: // returns INVALID_SOCKET on failure. eats the err. 336: SOCKET 337: nice_accept(SOCKET listener) 338: { 339: struct sockaddr_in remoteaddr; 340: int addrlen = sizeof(remoteaddr); 341: SOCKET s; 342: 343: // accept returns INVALID_SOCKET when it fails 344: s = accept(listener, (struct sockaddr*)&remoteaddr, &addrlen); 345: 346: if(INVALID_SOCKET == s) 347: { 348: fprintf(stderr,"nice_accept failed (%i)\n", WSAGetLastError()); 349: } 350: 351: // the posix version makes the socket nonblocking here 352: // we're not bothering 353: 354: return s; 355: } 356: 357: // returns SOCKET_ERROR on failure, with err in WSAGetLastError() 358: static int 359: connect_sock(SOCKET s, const char* addr, int port) 360: { 361: struct sockaddr_in sock_addr; 362: 363: memset(&sock_addr, 0, sizeof(sock_addr)); 364: sock_addr.sin_family = AF_INET; 365: sock_addr.sin_addr.s_addr = inet_addr(addr); 366: sock_addr.sin_port = htons(port); 367: 368: return connect(s, (struct sockaddr *)&sock_addr, sizeof(sock_addr)); 369: } 370: 371: // returns INVALID_SOCKET on failure, eats last error with WSAGetLastError 372: // unlike the posix version, this does not make the socket nonblocking. 373: SOCKET 374: nice_connect(const char* addr, int port) 375: { 376: SOCKET s; 377: 378: if((s = socket(AF_INET, SOCK_STREAM, 0)) != INVALID_SOCKET 379: && connect_sock(s, addr, port) != SOCKET_ERROR) 380: { 381: return s; /* success! */ 382: } 383: 384: /* something happened (not as good as catch 22) */ 385: fprintf(stderr,"nice_connect failed (%i)\n", WSAGetLastError()); 386: 387: if(INVALID_SOCKET != s && closesocket(s) == SOCKET_ERROR) 388: fprintf(stderr,"nice close failed (%i)\n", WSAGetLastError()); 389: 390: return INVALID_SOCKET; 391: } 392: 393: // returns -1 on error with errno in WSAGetLastError. 0 otherwise. 394: // kind of crap. 395: int 396: set_tcp_nodelay(int s, int disable) 397: { 398: BOOL disable_nagle = (disable) ? true : false; 399: 400: int res = setsockopt(s, IPPROTO_TCP, TCP_NODELAY, 401: (const char*)&disable_nagle, sizeof(disable_nagle)); 402: 403: return (res == SOCKET_ERROR) ? -1 : 0; 404: } 405: 406: }} 407: 408:
1: #line 499 "./lpsrc/flx_iocp_demux.ipk" 2: #include "demux_overlapped.hpp" 3: #include <stdio.h> // fprintf 4: #include <assert.h> 5: 6: // cygwin's copy of mswsock.h leaves something to be desired... 7: #ifndef WSAID_CONNECTEX 8: typedef 9: BOOL 10: (PASCAL FAR * LPFN_CONNECTEX) ( 11: IN SOCKET s, 12: IN const struct sockaddr FAR *name, 13: IN int namelen, 14: IN PVOID lpSendBuffer OPTIONAL, 15: IN DWORD dwSendDataLength, 16: OUT LPDWORD lpdwBytesSent, 17: IN LPOVERLAPPED lpOverlapped 18: ); 19: 20: #define WSAID_CONNECTEX \ 21: {0x25a207b9,0xddf3,0x4660,{0x8e,0xe9,0x76,0xe5,0x8c,0x74,0x06,0x3e}} 22: #endif 23: 24: namespace flx { namespace demux { 25: 26: // windows includes files here? vs will be fussy. 27: 28: // AcceptEx 29: 30: // return async finished flag (error flags - can be transmitted via class) 31: // AcceptEx is the way to get accept connections via the IOCP 32: bool 33: acceptex_control_block::start_overlapped() 34: { 35: clear_overlapped(); 36: 37: // I've seen two examples get the pointer to AcceptEx, just in case it 38: // isn't implemented... 39: // fprintf(stderr,"AcceptExing: listen backlog => can succeed immediately\n"); 40: 41: // this is only set when acceptex receives data and returns immediately. 42: // can't hurt to set it. 43: DWORD nbytes = 0; 44: BOOL success; 45: 46: // note that in order to get the wakeup packet, the listener must 47: // already be associated with the iocp. for future async io, the acceptor 48: // must be associated too. 49: success = AcceptEx(listener, acceptor, 50: accept_buf, // required - near/far address 51: 0, // receive data size - don't yet want this 52: ACCEPTEX_ADDR_SIZE, // must be nonzero 53: ACCEPTEX_ADDR_SIZE, // must be nonzero 54: &nbytes, // only set if fn completes. should be 0 55: &ol); // oblig. gets us back to the this ptr 56: 57: // if there is a backlog of connections, AcceptEx can return immediately 58: if(success) 59: { 60: // must clear the wait 61: fprintf(stderr,"WHOA! AcceptEx RETURNED SUCCESS IMMEDIATELY!\n"); 62: fprintf(stderr, "This could be bad, as wait should call op_finish\n"); 63: fprintf(stderr, "We also lose the udat cookie (set to NULL)\n"); 64: // handle the successful wakeup 65: // complete_async_op(demux, drv, nbytes, NO_ERROR); 66: // I hope they don't want the udat pointer, because I 67: // just made it up (0=NULL). Not using it anyway. 68: iocp_op_finished(nbytes, 0, &ol, NO_ERROR); 69: return false; // means no completion packet coming (correct?) 70: } 71: else 72: { 73: int err = WSAGetLastError(); 74: // can also return WSACONNRESET, which isn't so bad 75: if(ERROR_IO_PENDING == err) 76: { 77: // fprintf(stderr,"AcceptEx returned ERROR_IO_PENDING - that's normal\n"); 78: // This is the normal situation, fall through, leaving thread 79: // to sleep on wakeup call. 80: } 81: else 82: { 83: fprintf(stderr,"AcceptEx failed: %i\n", err); 84: fprintf(stderr,"returning true should wake thread to detect failure.\n"); 85: return true; // have self woken 86: } 87: } 88: return false; // async not finished 89: 90: } 91: 92: // ConnectEx 93: #if 0 94: // apparently we're supposed to do this now to make the acceptee inherit 95: // the listener's state. it is currently in the default state 96: //err = setsockopt( sAcceptSocket, 97: // SOL_SOCKET, 98: // SO_UPDATE_ACCEPT_CONTEXT, 99: // (char *)&sListenSocket, 100: // sizeof(sListenSocket) ); 101: #endif 102: 103: // what a pain in the arse (zzz) 104: // This doesn't exist in win2000, so it'll need to be synchronous there. 105: static int 106: GetConnectExAddr(SOCKET s, LPFN_CONNECTEX* conn_fn) 107: { 108: *conn_fn = NULL; 109: GUID GuidConnectEx = WSAID_CONNECTEX; 110: DWORD dwBytes; 111: int err; 112: 113: err = WSAIoctl(s, // why do I need this? 114: SIO_GET_EXTENSION_FUNCTION_POINTER, 115: &GuidConnectEx, 116: sizeof(GuidConnectEx), 117: conn_fn, 118: sizeof(*conn_fn), 119: &dwBytes, 120: NULL, NULL); // no overlapped, no completion fun ptr 121: // fprintf(stderr,"Get addr dwbytes: %li\n", dwBytes); 122: return err; 123: } 124: 125: // this is the weirdest. To use ConnectEx, the socket must be already bound. 126: // By trial and error, I found that it had to be bound to INADDR_ANY: 0. 127: // So strange. Apparently I don't have to do it again if I want to reuse. 128: static int 129: bind_socket(SOCKET s) 130: { 131: SOCKADDR_IN addr; 132: 133: ZeroMemory(&addr, sizeof(addr)); 134: addr.sin_family = AF_INET; 135: addr.sin_addr.s_addr = htonl(INADDR_ANY); 136: addr.sin_port = htons(0); 137: 138: return bind(s, (LPSOCKADDR)&addr, sizeof(addr)); 139: } 140: 141: bool 142: connectex_control_block::start_overlapped() 143: { 144: clear_overlapped(); 145: 146: // why not get this directly from the ConnectEx? 147: socket_err = ERROR_IO_PENDING; 148: 149: 150: DWORD bytes_sent = 0; // we're not sending data on connect (yet) 151: BOOL success; 152: 153: LPFN_CONNECTEX pfConnectEx; 154: 155: // unfortunate, will fix up later. 156: // fprintf(stderr,"Getting ConnectEx address\n"); 157: 158: // Turns out that ConnectEx isn't defined anywhere; I have to load its 159: // addr via WSAIoctl 160: // this is a bad way. make the driver cache it. why on earth is this 161: // call per-socket? does it really need to be that way? 162: if(GetConnectExAddr(s, &pfConnectEx) == SOCKET_ERROR) 163: { 164: fprintf(stderr,"GetConnectExAddr failed: %i\n", WSAGetLastError()); 165: return true; 166: } 167: 168: // fprintf(stderr,"about to connectex to %s:%i, %i\n", addy, p, s); 169: 170: // this is so strange - I have to bind the socket to the localhost. 171: // if I don't, ConnectEx returns EINVAL. in any case, I won't need 172: // to do this again if I reuse this socket. 173: if(bind_socket(s) == SOCKET_ERROR) 174: fprintf(stderr,"ConnectEx bind failed: %i\n", WSAGetLastError()); 175: 176: // I hope ConnectEx doesn't want this to hang around, because it's 177: // going to drop off the stack after this. 178: SOCKADDR_IN addr; 179: 180: // some examples don't zero the addr. That makes me nervous. 181: ZeroMemory(&addr, sizeof(addr)); 182: addr.sin_family = AF_INET; 183: addr.sin_addr.s_addr = inet_addr(addy); 184: addr.sin_port = htons(p); 185: 186: // in order to receive the wakeup packet, s must already be associated 187: // with the iocp. this is best done at socket creation time. for these 188: // sockets it's probably best to also bind them at the same time. 189: // that requires "purposed" sockets (CreateConnectSocket?). 190: // p.s. the default (waio_base) wakeup is doing fine for now. 191: 192: success = (*pfConnectEx)(s, // socket 193: (LPSOCKADDR)&addr, // connect address 194: sizeof(addr), // size thereof 195: NULL, // not sending any data yet, but we could 196: 0, // ditto 197: NULL, // should be zero until this changes 198: &ol); // oblig. gets us back to the this ptr 199: 200: // there's a caveat about the type of socket s becomes after ConnectEx. 201: // It's in some kind of default state and cannot be used with shutdown 202: // change it with setsockopt (?) 203: if(success) 204: { 205: fprintf(stderr,"WHOA! ConnectEx RETURNED SUCCESS IMMEDIATELY!\n"); 206: fprintf(stderr, "BAD: calls op_finish and loses udat cookie\n"); 207: // handle the successful wakeup. (udat=0, olp=&ol) 208: iocp_op_finished(bytes_sent, 0, &ol, NO_ERROR); 209: return false; // means no completion packet coming (correct?) 210: } 211: else 212: { 213: int err = WSAGetLastError(); 214: 215: if(ERROR_IO_PENDING == err) 216: { 217: // fprintf(stderr,"ConnectEx pending...\n"); 218: // This is the normal situation, fall through, leaving thread 219: // to sleep on wakeup call. 220: } 221: else 222: { 223: // maybe store the error here. that could work for all 224: // windows wakeups 225: fprintf(stderr,"ConnectEx failed: %i\n", err); 226: return true; // have self woken 227: } 228: } 229: return false; // not finished 230: } 231: 232: // TransmitFile 233: 234: bool 235: transmitfile_control_block::start_overlapped() 236: { 237: clear_overlapped(); 238: 239: // 0 bytes => transmit entire file 240: // the second zero means use the default chunk size 241: // the NULL is for mem buffers to bookend the file with. nothing yet. 242: // the final zero is for various flags, including a way of doing 243: // DisconnectEx style socket reuse (more widely compatible?) 244: 245: // in order to receive the wakeup, s must already be associated with the 246: // iocp. this is best done at socket creation time. 247: if(TransmitFile(s, file, 0, 0, &ol, NULL, flags)) 248: { 249: fprintf(stderr,"Transmit file succeeded immediately! waking...\n"); 250: return true; 251: } 252: else 253: { 254: DWORD err = WSAGetLastError(); 255: 256: // will need to actually signal something 257: // fprintf(stderr,"signal TransmitFile failure!\n"); 258: if(ERROR_IO_PENDING != err && WSA_IO_PENDING != err) 259: fprintf(stderr,"genuine error from TransmitFile: %li\n", err); 260: } 261: return false; 262: } 263: 264: 265: // SOCKET io using WSASend and WSARecv 266: 267: // windows style control blocks 268: wsasocketio_control_block::wsasocketio_control_block(SOCKET src, sel_param* pb, 269: bool inread) 270: : s(src), ppb(pb), reading(inread) 271: { 272: } 273: 274: bool 275: wsasocketio_control_block::start_overlapped() 276: { 277: clear_overlapped(); 278: 279: error = 0; 280: 281: // num bytes received IF recv completes immediately. 282: DWORD imm_bytes; 283: int recv_res; 284: 285: // set up the single wbuf, bearing in mind we may be part way. 286: wbufs[0].len = ppb->buffer_size - ppb->bytes_written; 287: wbufs[0].buf = ppb->buffer + ppb->bytes_written; 288: 289: // fprintf(stderr, "sockio: %p->finished = %i, reading = %i\n", 290: // ppb, ppb->finished(), reading); 291: 292: // Ideally, we would like to be able to use MSG_WAITALL, which would 293: // let us only get a completion packet when either all the data was 294: // available or the connection had been closed or shutdown. 295: // Unfortunately this is not possible for non-blocking sockets, so 296: // we have to take whatever we get and then call WSARecv again. 297: 298: //#define MSG_WAITALL 0 // not defined in cygwin - apparently this 299: //DWORD flags = MSG_WAITALL; 300: 301: // ah, unfortunately MSG_WAITALL is not supported for non blocking sockets 302: // we'll just have to do it ourselves 303: DWORD flags = MSG_PARTIAL; 304: 305: // completion routines! (unused) 306: if(reading) 307: recv_res = WSARecv(s, wbufs, NUM_WBUFS, &imm_bytes, &flags, &ol, NULL); 308: else 309: recv_res = WSASend(s, wbufs, NUM_WBUFS, &imm_bytes, flags, &ol, NULL); 310: 311: // don't know if I need to check non winsock errs 312: 313: switch(recv_res) 314: { 315: case 0: 316: { 317: // flags are updated to indicate what? if there was a callback, it 318: // would be scheduled to be called when this thread is in the 319: // waitable state, whatever that means. 320: //fprintf(stderr, 321: // "WSA%s completed immediately!!! nbytes: %li, flags: %lx\n", 322: // (reading) ? "Recv" : "Send", imm_bytes, flags); 323: 324: // looks like we get the completion packet even if we do finish 325: // immediately so let the iocp wake us. note that this method 326: // of manually calling iocp_op_finished is not so great as we 327: // don't know the (as yet unused) udat cookie and so set it to 0. 328: // fprintf(stderr, "calling finished manually (ppb=%p)\n", ppb); 329: // iocp_op_finished(imm_bytes, 0, &ol, NO_ERROR); 330: 331: // false because iocp_op_finished will wake us. I guess false from 332: // these guys means that a completion packet is in the mail and 333: // true means that it isn't (in the mail). 334: return false; 335: } 336: break; 337: case SOCKET_ERROR: 338: { 339: DWORD err = WSAGetLastError(); 340: 341: // normal mode - wait for completion 342: // fyi, xp pro seems to mostly give us ERROR_IO_PENDING 343: if(ERROR_IO_PENDING == err || WSA_IO_PENDING == err) 344: { 345: // fprintf(stderr,"WSA%s pending completion (%li)\n", 346: // (reading) ? "Recv" : "Send", err); 347: return false; 348: } 349: 350: fprintf(stderr,"WSARecv/Send returned SOCKET_ERR: %li\n", err); 351: return true; // assume it's bad and we won't get a wakeup 352: } 353: break; 354: default: 355: { 356: fprintf(stderr,"WSARecv/Send returned other error: %i, GetLastError: %li\n", 357: recv_res, GetLastError()); 358: return true; // wake up 359: } 360: break; 361: } 362: 363: return false; 364: } 365: 366: // NB: called by iocp::wait, so be aware of which thread is doing what, lest 367: // you be surprised by this being called asynchronously. 368: void 369: wsasocketio_control_block::iocp_op_finished(DWORD nbytes, ULONG_PTR udat, 370: LPOVERLAPPED olp, int err) 371: { 372: error = err; // copy back for others to look at. 373: // perhaps move back to iocpwakeup 374: // fprintf(stderr, "wsasocketio::finished: ppb=%p, nbytes=%li, err=%i, read=%i\n", 375: // ppb, nbytes, err, reading); 376: 377: if(err) 378: { 379: fprintf(stderr, "wsasocketio, got error: %i\n", err); 380: } 381: 382: // fprintf(stderr,"wsa_socketio wakeup, nb: %li, err: %i\n", nbytes, err ); 383: assert( !ppb->finished() ); 384: // keep track of bytes received. 385: ppb->bytes_written += nbytes; 386: 387: if(0 == nbytes) 388: { 389: fprintf(stderr, "wsaiosocket got zero bytes: err=%i, read=%i\n", 390: err, reading); 391: } 392: 393: // if we're not finished, we have to reinstall our request 394: // zero bytes indicates shutdown/closure, right? 395: // might be using this for WSASend. Instead of broken pipes on win32, 396: // instead we get WSAECONNRESET (pretty sure) on write. On read? 397: // not sure about this - I don't think we have to check nbytes == 0 398: if(0 == nbytes || ppb->finished()) 399: { 400: return; 401: } 402: else 403: { 404: // go back around again 405: fprintf(stderr,"didn\'t get everything (%li of %li bytes)\n", 406: ppb->bytes_written, ppb->buffer_size); 407: if(start_overlapped()) 408: { 409: fprintf(stderr, "UM, socket restart finished immediately\n"); 410: fprintf(stderr, "causes new wakeup? or should I loop around?\n"); 411: } 412: } 413: } 414: 415: 416: // file/pipe io using ReadFile and WriteFile 417: 418: winfileio_control_block::winfileio_control_block(HANDLE f, void* buf, int len, 419: bool inread) 420: : reading(inread), file(f) 421: { 422: // pb is not so useful here. we only want to 423: // know num bytes written/processed. 424: pb.buffer = (char*)buf; 425: pb.buffer_size = len; 426: pb.bytes_written = 0; 427: } 428: 429: // if file is opened with FILE_FLAG_OVERLAPPED, we can do "immutable file ptr" 430: // ops & set the desired offset within the overlapped. can also stick an 431: // event to signal in there. 432: bool 433: winfileio_control_block::start_overlapped() 434: { 435: // fprintf(stderr,"winfileio_cb::start_overlapped: reading=%i\n", reading); 436: clear_overlapped(); 437: 438: // DWORD imm_bytes; 439: BOOL success; 440: 441: // don't need bytes read, written when we have an OVERLAPPED 442: if(reading) 443: // success = ReadFile(file, pb.buffer, pb.buffer_size, &imm_bytes, &ol); 444: success = ReadFile(file, pb.buffer, pb.buffer_size, NULL, &ol); 445: else 446: //success = WriteFile(file, pb.buffer, pb.buffer_size, &imm_bytes, &ol); 447: success = WriteFile(file, pb.buffer, pb.buffer_size, NULL, &ol); 448: 449: // fprintf(stderr,"immbytes = %li\n", imm_bytes); 450: 451: if(!success) 452: { 453: int err = GetLastError(); 454: 455: // I'm getting this for unfinished 456: if(ERROR_IO_PENDING == err) 457: { 458: return false; // sleep on 459: } 460: else 461: { 462: fprintf(stderr,"%sFile failed! (%li)\n", (reading) ? "Read" : "Write", 463: err); 464: return true; // ask for wakeup 465: } 466: 467: // fprintf(stderr,"%sFile failed! (%li)\n", (reading) ? "Read" : "Write", 468: // GetLastError()); 469: // fprintf(stderr,"do I still get completion packet???\n"); 470: // assume not 471: } 472: 473: return false; // sleep on 474: } 475: 476: }} 477:
1: #line 977 "./lpsrc/flx_iocp_demux.ipk" 2: #ifndef __FLX_DEMUX_OVERLAPPED_H__ 3: #define __FLX_DEMUX_OVERLAPPED_H__ 4: #include "flx_demux_config.hpp" 5: 6: // visual studio is quite sensitve about how you do these includes. 7: #include <WinSock2.h> // Winsock2 (WSABufs, etc) must come before Windows.h 8: #include "demux_iocp_demuxer.hpp" // this header file include Windows.h 9: #include <MSWSock.h> // AcceptEx, TF_REUSE_SOCKET, etc 10: 11: namespace flx { namespace demux { 12: 13: // rename these to control block something or other 14: // get rid of default constructors - faio can worry about that. 15: 16: // WARNING: in some "immediate completion" cases I have to call 17: // the finished function myself - in these cases I set the udat to 0, making 18: // it not very reliable. Either make sure the user understands immediate 19: // finish (and does it themselves) or keep a copy of udat. 20: 21: // listener socket must be already associated with an IOCP 22: // in doing an AcceptEx, it might succeed immediately - do you still 23: // get the IOCP wakeup? 24: class DEMUX_EXTERN acceptex_control_block : public iocp_wakeup { 25: enum { ACCEPTEX_ADDR_SIZE = sizeof(SOCKADDR_IN) + 16 }; 26: 27: public: 28: SOCKET listener, acceptor; 29: // there are two of these! 30: char accept_buf[2*ACCEPTEX_ADDR_SIZE]; 31: 32: virtual bool start_overlapped(); 33: 34: acceptex_control_block() 35: : listener(INVALID_SOCKET), acceptor(INVALID_SOCKET) {} 36: }; 37: 38: class DEMUX_EXTERN connectex_control_block : public iocp_wakeup 39: { 40: public: 41: // move further back? 42: int socket_err; // outgoing 43: 44: // can have buffer to be sent on connection 45: SOCKET s; // previously unbound socket 46: const char* addy; // ipv4 address 47: int p; // port number 48: 49: // socket_err undefined 50: connectex_control_block() : s(INVALID_SOCKET), addy(0), p(0) {} 51: 52: // see posix version of this, try to keep them in sync. give socket_err 53: // initial definition that works with this? 54: bool finished() { return ERROR_IO_PENDING != socket_err; } 55: 56: virtual bool start_overlapped(); 57: }; 58: 59: // TransmitFile here (requires file handle) 60: class DEMUX_EXTERN transmitfile_control_block : public iocp_wakeup { 61: SOCKET s; 62: HANDLE file; 63: DWORD flags; // for possible socket reuse. 64: public: 65: 66: transmitfile_control_block(SOCKET dst) // for reuse of socket 67: : s(dst), file(NULL), flags(TF_DISCONNECT | TF_REUSE_SOCKET) {} 68: 69: transmitfile_control_block(SOCKET dst, HANDLE src) // actual transmitfile 70: : s(dst), file(src), flags(0) {} 71: 72: virtual bool start_overlapped(); 73: }; 74: 75: 76: // handles both WSASend & WSARecv 77: class DEMUX_EXTERN wsasocketio_control_block : public iocp_wakeup { 78: protected: 79: enum { NUM_WBUFS = 1 }; // just one for now, but can do scattered send/recvs 80: WSABUF wbufs[NUM_WBUFS]; 81: public: 82: SOCKET s; 83: sel_param* ppb; // on input what you want, on output what you got 84: int error; 85: bool reading; // else use WSASend 86: 87: // watch the memory interfaces here, going back and forth between threads. 88: wsasocketio_control_block(SOCKET src, sel_param* pb, bool read); 89: 90: virtual bool start_overlapped(); 91: 92: virtual void iocp_op_finished( DWORD nbytes, ULONG_PTR udat, 93: LPOVERLAPPED olp, int err); 94: }; 95: 96: // winfileio_control_block, much like wsasocketio_control_block 97: class DEMUX_EXTERN winfileio_control_block : public iocp_wakeup { 98: bool reading; 99: public: 100: sel_param pb; 101: HANDLE file; // I like to modify this from the outside 102: 103: // offset? 104: winfileio_control_block(HANDLE f, void* buf, int len, bool read); 105: 106: virtual bool start_overlapped(); 107: 108: // NB: no iocp_op_finished callback. defined by users of the class. 109: }; 110: 111: }} 112: 113: #endif 114:
1: #line 1092 "./lpsrc/flx_iocp_demux.ipk" 2: #ifndef __FLX_DEMUX_WSELF_PIPER_H__ 3: #define __FLX_DEMUX_WSELF_PIPER_H__ 4: 5: #include <flx_demux_config.hpp> 6: #include "demux_overlapped.hpp" // I use readfile control block 7: #include "demux_wself_piper.hpp" 8: 9: namespace flx { namespace demux { 10: 11: class DEMUX_EXTERN auto_handle { 12: public: 13: HANDLE h; 14: 15: auto_handle(); 16: ~auto_handle(); 17: }; 18: 19: // win32 self pipe trick (what a pain!) 20: class DEMUX_EXTERN wpipe_pair { 21: enum { READ_END, WRITE_END }; 22: // HANDLE pipe[2]; // 0 = read end, 1 = write end 23: auto_handle pipe[2]; 24: public: 25: wpipe_pair(); 26: 27: void write_byte(); 28: HANDLE get_read_end() { return pipe[READ_END].h; } 29: }; 30: 31: // use a winfileio_control_block to install a nonblocking ReadFile on the 32: // read end of the pipe pair. when we get a byte we can execute whatever 33: // the user wanted which for win32 which seems to be naturally responsive 34: // to new sockets/handles. demux quit will probably be the only operation 35: // needed. 36: class DEMUX_EXTERN wself_piper_wakeup : public winfileio_control_block 37: { 38: char the_byte; 39: 40: public: 41: // possibly null, if not, called on iocp_op_finished 42: demux_callback* cb; 43: 44: // the demuxer. doesn't actually get passed by iocp_op_finished 45: iocp_demuxer* d; 46: 47: 48: wself_piper_wakeup(); 49: 50: // detect when single byte read has finished and exec callback, 51: // re-arming. 52: virtual void iocp_op_finished(DWORD nbytes, ULONG_PTR udat, 53: LPOVERLAPPED olp, int err); 54: 55: void arm(); 56: }; 57: 58: // at the very least, the read end must be nonblocking and associated 59: // with the iocp. 60: class DEMUX_EXTERN wself_piper { 61: wpipe_pair pp; 62: wself_piper_wakeup spw; 63: public: 64: void install(demuxer* demux, demux_callback* cb = 0); 65: void wake(); // wakes demuxer which calls callback 66: }; 67: 68: } } // demux, flx 69: 70: #endif 71:
1: #line 1164 "./lpsrc/flx_iocp_demux.ipk" 2: 3: #include "demux_wself_piper.hpp" 4: #include <stdio.h> 5: 6: namespace flx { namespace demux { 7: 8: auto_handle::auto_handle() 9: { 10: h = INVALID_HANDLE_VALUE; 11: } 12: 13: auto_handle::~auto_handle() 14: { 15: if(INVALID_HANDLE_VALUE == h) return; // done 16: 17: if(!CloseHandle(h)) 18: fprintf(stderr, "auto CloseHandle failed: %i\n", GetLastError()); 19: } 20: 21: wpipe_pair::wpipe_pair() 22: { 23: // fprintf(stderr, "wpipe CTOR\n"); 24: // made bufsize 1 as we only ever read and write 1 byte at a time 25: 26: // looks like I can't use anonymous pipes with iocp. will have to 27: // use a specially setup named pipe - one that allows repetitions... 28: // this doesn't sound good. 29: // This will be a problem if we have multiple event waiting threads. 30: // If I don't get the flags right it will also be a problem if we 31: // have multiple instances/apps running. I don't wan't this resource 32: // to be globally visible. Flags? 33: const char* pname = "\\\\.\\pipe\\flx_iocp_quitter"; 34: 35: // don't actually need duplex, nor those buffers. 1 byte at a time suffices 36: // I probably don't need both ends to be marked as nonblocking either, should 37: // only need the read end nonblocking. 38: 39: // create pipe 40: pipe[READ_END].h = CreateNamedPipe(pname, 41: PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED, 42: PIPE_TYPE_BYTE, 1, 256, 256, 0, NULL); 43: 44: if(INVALID_HANDLE_VALUE == pipe[READ_END].h) 45: { 46: fprintf(stderr, "couldn't create named pipe: %i\n", GetLastError()); 47: throw -1; 48: } 49: 50: // this is the part that I don't like - this pipe's name isn't unique 51: // and so theoretically another iocp quitter could join here. a race! 52: 53: // connect to it. note that overlapped isn't needed for write end as 54: // we want to block. 55: pipe[WRITE_END].h = CreateFile(pname, FILE_READ_DATA | FILE_WRITE_DATA, 56: FILE_SHARE_READ | FILE_SHARE_WRITE, NULL, OPEN_EXISTING, 57: FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED, NULL); 58: 59: if(INVALID_HANDLE_VALUE == pipe[WRITE_END].h) 60: { 61: fprintf(stderr, "failed to open named pipe: %i\n", GetLastError()); 62: throw -1; 63: } 64: 65: // anonymous pipes can't be made nonblocking/iocpable on windows! 66: // What a shame! 67: /* 68: if(!CreatePipe(&pipe[READ_END], &pipe[WRITE_END], NULL, 1)) 69: { 70: fprintf(stderr, "wpipe_pair CreatePipe failed: %i\n", GetLastError()); 71: throw -1; 72: } 73: */ 74: } 75: 76: void 77: wpipe_pair::write_byte() 78: { 79: // I think I want a blocking write here. 80: char b = 1; 81: DWORD bytes_written; 82: // last arg is overlapped pointer, unused, we want to block. 83: if(!WriteFile(pipe[WRITE_END].h, &b, 1, &bytes_written, NULL)) 84: fprintf(stderr, "wpipe_pair failed to write byte: %i\n", 85: GetLastError()); 86: } 87: 88: void 89: wself_piper::install(demuxer* d, demux_callback* cb) 90: { 91: fprintf(stderr, "wself_piper::install(%p, %p)\n", d, cb); 92: iocp_demuxer* demux = static_cast<iocp_demuxer*>(d); 93: 94: // make read end non blocking and associate with iocp 95: HANDLE read_end = pp.get_read_end(); 96: 97: #if 0 98: // make the anonymous pipe non blocking. this function is for named pipes, 99: // but I've heard talk that it works for anon pipes too. Nope, doesn't work. 100: DWORD pipe_mode = PIPE_NOWAIT; 101: if(!SetNamedPipeHandleState(read_end, &pipe_mode, NULL, NULL)) 102: { 103: fprintf(stderr, "SetNamedPipeHandleState failed: %i\n", GetLastError()); 104: return; // not much to be done here. 105: } 106: #endif 107: 108: if(0 != demux->associate_with_iocp(read_end, NULL)) 109: { 110: fprintf(stderr, "failed to install self pipe in IOCP!!!\n"); 111: return; // error code? 112: } 113: 114: // copy into the self pipe wakeup, for its later use. 115: spw.d = demux; 116: spw.cb = cb; 117: spw.file = read_end; 118: 119: fprintf(stderr, "initial self pipe arm\n"); 120: spw.arm(); 121: } 122: 123: // wakes demuxer 124: void 125: wself_piper::wake() 126: { 127: fprintf(stderr, "wself_piper::wake - write a byte\n"); 128: pp.write_byte(); 129: } 130: 131: wself_piper_wakeup::wself_piper_wakeup() 132: // configure the control block for ReadFile on the read end pipe 133: // will set pipe handle later, read = true 134: : winfileio_control_block(INVALID_HANDLE_VALUE, NULL, 0, true), 135: cb(0), d(0) 136: { 137: // I'll probably need to reset the byte address 138: fprintf(stderr, "SET UP THE PIPE HANDLE!\n"); 139: } 140: 141: // at this point the byte has already been read. we want to re-arm for future 142: // wakeups. 143: void 144: wself_piper_wakeup::iocp_op_finished(DWORD nbytes, ULONG_PTR udat, 145: LPOVERLAPPED olp, int err) 146: { 147: fprintf(stderr, "wself_piper_wakeup::iocp_op_finished\n"); 148: fprintf(stderr, "nbytes=%i, err=%i\n", nbytes, err); 149: fprintf(stderr, "about to callback %p(%p)\n", cb, d); 150: 151: if(cb) cb->callback(d); 152: 153: arm(); // re-arm 154: } 155: 156: void 157: wself_piper_wakeup::arm() 158: { 159: // exec another nonblocking ReadFile on read end of pipe 160: fprintf(stderr, "wself_piper_wakeup::arm\n"); 161: pb.buffer = &the_byte; 162: pb.buffer_size = 1; 163: pb.bytes_written = 0; 164: if(start_overlapped()) 165: fprintf(stderr, "WARNING: wslef_pipe install completed immediately\n"); 166: } 167: 168: } } 169: 170: 171: 172:
1: #line 3 "./lpsrc/flx_wintimer_demux.ipk" 2: #ifndef __FLX_DEMUX_WIN_TIMER_QUEUE_H__ 3: #define __FLX_DEMUX_WIN_TIMER_QUEUE_H__ 4: 5: #include "flx_demux_config.hpp" 6: #include <Windows.h> 7: 8: #include "demux_timer_queue.hpp" 9: 10: namespace flx { namespace demux { 11: 12: class DEMUX_EXTERN win_timer_queue : public timer_queue 13: { 14: HANDLE timer_queue; 15: 16: static VOID CALLBACK timer_callback(PVOID, BOOLEAN); 17: public: 18: win_timer_queue(); 19: ~win_timer_queue(); 20: 21: virtual void add_sleep_request(sleep_task* st, double delta); 22: virtual void add_abs_sleep_request(sleep_task* st, double when); 23: 24: }; 25: 26: }} 27: 28: #endif // __SLEEP_TASK__ 29:
1: #line 33 "./lpsrc/flx_wintimer_demux.ipk" 2: #include "flx_demux_config.hpp" 3: #include <Windows.h> 4: #include <assert.h> 5: 6: // simply wrapped windows timer queue. requires windows 5.00, which is 7: // quite high (xp?) because I couldn't get the waitable timers to work. 8: // must be careful with this stuff lest it create millions of threads. 9: #include "demux_win_timer_queue.hpp" 10: 11: #include <stdio.h> 12: 13: namespace flx { namespace demux { 14: 15: timer_queue *mk_timer_queue() { return new win_timer_queue; } 16: 17: #define MIL 1000000 // 1 metric million 18: 19: typedef struct 20: { 21: sleep_task* st; // so we can make it fire 22: HANDLE timer; // we need to delete the timer, so we keep it 23: HANDLE timer_queue; // AND its queue (no back ptrs, I guess) 24: } timer_cookie; 25: 26: win_timer_queue::win_timer_queue() 27: { 28: // fprintf(stderr,"win_timer_queue ctor\n"); 29: 30: timer_queue = CreateTimerQueue(); 31: if(!timer_queue) 32: { 33: fprintf(stderr, "CreateTimerQueue failed: %i\n", GetLastError()); 34: throw -1; 35: } 36: // fprintf(stderr, "created timer queue: %p\n", timer_queue); 37: } 38: 39: 40: win_timer_queue::~win_timer_queue() 41: { 42: // INVALID_HANDLE_VALUE indicates that DeleteTimerQueueEx should wait for 43: // all callback functions to complete before returning. One would hope that 44: // calling this causes all the timers to go off before their time (what 45: // else would the "actually fired" callback flag be for?). The alternative 46: // of waiting for some ever distant timer to go off would be too stupid 47: // for words. As usual, the msdn glosses over the important details like 48: // this one. Anyway, it's easy to test out... No, that flag's always true 49: // for timers, and this wait option doesn't work - maybe with other types 50: // flags for CreateTimerQueueTimer? 51: if(!DeleteTimerQueueEx(timer_queue, INVALID_HANDLE_VALUE)) 52: { 53: fprintf(stderr, "DeleteTimerQueueEx failed: %i\n", GetLastError()); 54: // whatcha gonna do about it? 55: } 56: // fprintf(stderr, "finished - did it wait?\n"); 57: } 58: 59: // note: may not need time to be in sleep_task. could pass time here. 60: // thread safe 61: void 62: win_timer_queue::add_sleep_request(sleep_task* st, double delta) 63: { 64: // fprintf(stderr,"add_sleep_request: %lf to %p\n", delta, timer_queue); 65: 66: timer_cookie* tc = new timer_cookie; 67: 68: // copy in the sleep_task and the timer queue 69: tc->st = st; 70: tc->timer_queue = timer_queue; 71: 72: // the timer thread may not be the best solution as nothing is stopping 73: // anyone from performing long operations with this structure, however 74: // in all likelihood, it'll just be felix adding threads back to its queue. 75: if(!CreateTimerQueueTimer( 76: &tc->timer, // resulting timer in timer_cookie 77: timer_queue, 78: //NULL, // add to default timer queue 79: timer_callback, // should get called in delta seconds 80: tc, // timer cookie is user data 81: (DWORD)(delta*1000), // millisecond timer 82: 0, // zero period => signal once 83: WT_EXECUTEINTIMERTHREAD)) // NB: for short tasks (will this do?) 84: { 85: fprintf(stderr, "CreateTimerQueueTimer failed: %i\n", GetLastError()); 86: delete tc; // at least try not to leak 87: return; 88: } 89: } 90: 91: // this is a c callback - all the c++ code should probably be wrapped 92: // in a try/catch. timer_or_wait_fired is always true for timers. 93: VOID CALLBACK 94: win_timer_queue::timer_callback(PVOID udat, BOOLEAN timer_or_wait_fired) 95: { 96: timer_cookie* tc = (timer_cookie*)udat; 97: 98: // fprintf(stderr, "timer queue callback fired: %p, %i\n", 99: // tc, timer_or_wait_fired); 100: 101: if(!tc) 102: { 103: // Nothing that we can do in this situation. 104: fprintf(stderr, "WHOA - NULL queue cookie! (fired: %i)\n", 105: timer_or_wait_fired); 106: return; // outta here 107: } 108: 109: // NULL means delete the thing now, INVALID_HANDLE_VALUE means wait until 110: // callback finishes. We're in the callback, so we can't do that (=deadlock 111: // of the timer thread, which isn't good). We're all adults here, the timer 112: // has expired, we know what we're doing, so lets just delete it. 113: tc->st->fire(); 114: 115: // on my box this returns ERROR_IO_PENDING, on others it doesn't 116: // msdn says this should be ok, but I'm not so sure. 117: if(!DeleteTimerQueueTimer(tc->timer_queue, tc->timer, NULL)) 118: { 119: int err = GetLastError(); 120: 121: if( ERROR_IO_PENDING != err) 122: { 123: fprintf(stderr, "DeleteTimerQueueTimer of %p failed: %i\n", 124: tc->timer, err); 125: } 126: else 127: { 128: // I'm not so sure, see if it leaks. 129: fprintf(stderr, "DeleteTimerQueueTimer = ERROR_IO_PENDING\n"); 130: fprintf(stderr, "Apparently this is ok...\n"); 131: } 132: } 133: delete tc; 134: 135: // fprintf(stderr, "leaving timer callback\n"); 136: } 137: 138: // in seconds from some ref pt (UTC for this fn) 139: // N.B. declared in base class! 140: void 141: timer_queue::get_time(double& t) 142: { 143: SYSTEMTIME sysnow; 144: GetSystemTime(&sysnow); 145: // now convert to seconds 146: // via FILETIME? 147: 148: // kinda sucks, but is the msdn recommended way of doing calculations 149: // on dates. 150: FILETIME fnow; 151: if(!SystemTimeToFileTime(&sysnow, &fnow)) 152: { 153: fprintf(stderr, "SystemTimeToFileTime failed: %i\n", GetLastError()); 154: t = 0; 155: return; 156: } 157: 158: ULARGE_INTEGER now; // so we can do some maths 159: 160: assert(sizeof(now) == sizeof(fnow)); 161: memcpy(&now, &fnow, sizeof(now)); 162: 163: // and now we have a big integer containing an offset jan 1, 1601 (UTC) 164: // 100 nanosecond intervals 165: t = now.QuadPart*MIL*10; // *10 to microseconds, *MIL to seconds 166: } 167: 168: void 169: win_timer_queue::add_abs_sleep_request(sleep_task* st, double when) 170: { 171: // win timer queue works with relative offsets, so convert this absolute 172: double now; 173: get_time(now); 174: double delta = when-now; 175: if(delta < 0.0) delta = 0.0; 176: add_sleep_request(st, delta); 177: } 178: 179: }} 180: 181:
1: #line 4 "./lpsrc/flx_posixtimer_demux.ipk" 2: #ifndef __FLX_DEMUX_POSIX_TIMER_QUEUE_H__ 3: #define __FLX_DEMUX_POSIX_TIMER_QUEUE_H__ 4: 5: #include "pthread_thread.hpp" // flx_thread_t 6: #include "pthread_mutex.hpp" // flx_mutex_t 7: #include "pthread_condv.hpp" // flx_condv_t 8: #include "demux_timer_queue.hpp" // base class 9: #include <sys/time.h> // timespecs, gettimeofday 10: 11: namespace flx { namespace demux { 12: 13: // looks like a worker queue, but couldn't quite mash it into one 14: class DEMUX_EXTERN posix_timer_queue : public timer_queue 15: { 16: flx::pthread::flx_mutex_t lock; // factor to prio queue? 17: flx::pthread::flx_condv_t sleep_cond; 18: flx::pthread::flx_thread_t sleep_thread; // joinable, we join later 19: void* opaque_prio_queue; // less fat 20: 21: static void thread_start(void*); // passed "this" 22: bool thread_loop_body(); 23: 24: 25: void wakeup_thread(); // we can do this! 26: 27: void add_sleep_request(sleep_task* st, timespec* abs); 28: public: 29: posix_timer_queue(); 30: ~posix_timer_queue(); 31: 32: // thread safe. 33: virtual void add_sleep_request(sleep_task* st, double delta); 34: 35: // in seconds, relative to same base as timer::get_time. 36: virtual void add_abs_sleep_request(sleep_task* st, double when); 37: }; 38: 39: }} 40: 41: #endif // __POSIX_TIMER_QUEUE__ 42:
1: #line 47 "./lpsrc/flx_posixtimer_demux.ipk" 2: #include "demux_posix_timer_queue.hpp" 3: 4: // a prio queue that executes tasks in a given order 5: // factor out prio_queue? could be like queue. 6: 7: // try to make work like the worker thread thing, fix it do so?. 8: // remove time from sleep task... 9: 10: #include <queue> // stl seems to have a prio_queue 11: #include <sys/time.h> // gettimeofday for calculating "now" 12: 13: //using namespace flx::pthread; 14: namespace flx { namespace demux { 15: 16: timer_queue *mk_timer_queue() { return new posix_timer_queue; } 17: 18: 19: #define MIL 1000000 // one million 20: #define BIL (MIL*1000) // one billion (metric) 21: 22: using namespace std; 23: 24: // it could happen! 25: // factor 26: class future_evt 27: { 28: public: 29: timespec when; 30: sleep_task* task; 31: 32: // ignore the direction, just trying to sort with smallest first 33: bool operator<(const future_evt& rhs) const 34: { 35: if(when.tv_sec != rhs.when.tv_sec) // precedence to more significant 36: return when.tv_sec > rhs.when.tv_sec; 37: else // else check the less significant 38: return when.tv_nsec > rhs.when.tv_nsec; 39: } 40: }; 41: 42: typedef priority_queue<future_evt> void_prio_queue; 43: #define PRIOQ ((void_prio_queue*)opaque_prio_queue) 44: 45: posix_timer_queue::posix_timer_queue() 46: { 47: opaque_prio_queue = new void_prio_queue; // a.k.a. PRIOQ 48: //fprintf(stderr,"initing timer sleep thread\n"); 49: 50: // NEED'S TO CHECK RETURN VAL AND HANDLE ERROR 51: if(sleep_thread.init(thread_start, this)) 52: fprintf(stderr, "failed to create posix timer queue thread!\n"); 53: } 54: 55: posix_timer_queue::~posix_timer_queue() 56: { 57: // the sleep_thread uses the prioq, so we must explicitly shut it 58: // down now, before we delete the prioq. left to its own devices, 59: // c++ destructs it at the end of this destructor. 60: 61: // take down the thread first because it uses all the other stuff. 62: // I actually don't need to do anything special to bring the thread 63: // down because all pthread_cond_*wait* are cancel aware. Or so they 64: // should be. As far as I can tell only the 64bit osx10.4.2 is, so 65: // for now the explicit cancel + wakeup followed by explicit 66: // cancel test stays. 67: 68: // fprintf(stderr, "asking timer thread to quit\n"); 69: add_sleep_request(NULL, 0.0); // super secret quit thread quit request 70: wakeup_thread(); // wakeup, cause to goto a cancel pt 71: 72: sleep_thread.join(); // will join 73: //fprintf(stderr,"about to delete PRIOQ\n"); 74: delete PRIOQ; 75: } 76: 77: static void 78: get_now(timespec* now) 79: { 80: struct timeval tp; 81: 82: if(gettimeofday(&tp, NULL) == -1) 83: perror("gettimeofday"); 84: 85: // (10^6-1)*1000 = 3B9AC618 = max usec -> nsec fits in a 32bit long. 86: now->tv_sec = tp.tv_sec; 87: now->tv_nsec = tp.tv_usec*1000; // fits! 88: 89: // fprintf(stderr,"get_now = %li, %li\n", now->tv_sec, now->tv_nsec); 90: } 91: 92: // LIMIT! 93: // seconds to microseconds - signed this gives a bit over half an hour 94: #define SEC2TIMESPEC(ts, t) long wait_musec = (long)(t*MIL); \ 95: timespec ts = { wait_musec / MIL, (wait_musec % MIL)*1000 } 96: 97: 98: // offset delta from "now" and store in "when" 99: static void 100: calc_when(timespec* when, double delta) 101: { 102: // how to use the posix abstime versions of timed waits? what kind of absolute 103: // is abstime? pthread_get_expiration_np looks useful, but it too is np. 104: // abstime is apparently in seconds since the Epoch, UTC. 105: // To get now there's clock_gettime (not portable) or gettimeofday with 106: // null timezone. 107: 108: timespec now; 109: get_now(&now); 110: 111: // limit! 112: // seconds to microseconds - signed this gives a bit over half an hour 113: // long wait_musec = (long)(delta*MIL); 114: // timespec delay = { wait_musec / MIL, (wait_musec % MIL)*1000 }; 115: SEC2TIMESPEC(delay, delta); 116: 117: // (10^6-1)*1000 = 3B9AC618 = max usec -> nsec fits in a 32bit long. 118: when->tv_sec = now.tv_sec + delay.tv_sec; 119: when->tv_nsec = now.tv_nsec + delay.tv_nsec; 120: 121: if(when->tv_nsec >= BIL) // overflow of nanoseconds? 122: { 123: // fprintf(stderr,"OVERFLOW = %li, %li\n", when->tv_sec, when->tv_nsec); 124: // x, y < BIL, x + y < 2BIL 125: when->tv_sec++; 126: when->tv_nsec -= BIL; 127: // when->tv_sec += when->tv_nsec/BIL; 128: // when->tv_nsec %= BIL; 129: } 130: 131: // fprintf(stderr,"when = %li, %li\n", when->tv_sec, when->tv_nsec); 132: // tp contains tv_sec (seconds) & tv_usec (microseconds) both longs. 133: // however, if nonposix works everywhere... 134: } 135: 136: // absolute time 137: void 138: posix_timer_queue::add_sleep_request(sleep_task* st, timespec* abs) 139: { 140: future_evt evt; 141: evt.task = st; 142: evt.when = *abs; 143: 144: flx::pthread::flx_mutex_locker_t locker(lock); 145: 146: PRIOQ->push(evt); 147: 148: // we may have inserted at sooner than any other evt, so wake up thread 149: // to figure it out (if need be). I seemed to be getting more wakeups 150: // with this. Turned off for now. Not sure how that works. 151: if(1 || PRIOQ->top().task == st) 152: { 153: // fprintf(stderr,"WE PUSHED IN - waking thread\n"); 154: wakeup_thread(); 155: } 156: } 157: 158: // note: may not need time to be in sleep_task. could pass time here. 159: // thread safe 160: void 161: posix_timer_queue::add_sleep_request(sleep_task* st, double delta) 162: { 163: // fprintf(stderr,"add_sleep_request: %lf\n", delta); 164: timespec when; 165: calc_when(&when, delta); // calculate when (t a delta) 166: 167: add_sleep_request(st, &when); 168: } 169: 170: void 171: posix_timer_queue::add_abs_sleep_request(sleep_task* st, double when) 172: { 173: // absolute version is closer to the posix implementation 174: SEC2TIMESPEC(abs_time, when); 175: add_sleep_request(st, &abs_time); 176: } 177: 178: // cause the timer wait thread to wake up. useful for asking it to 179: // exit or re-evaluate a changed sleep queue. 180: void 181: posix_timer_queue::wakeup_thread() 182: { 183: sleep_cond.signal(); 184: } 185: 186: void 187: posix_timer_queue::thread_start(void* udat) 188: { 189: posix_timer_queue* q = (posix_timer_queue*)udat; 190: //fprintf(stderr,"sleeper thread\n"); 191: 192: while(q->thread_loop_body()) ; 193: } 194: 195: bool 196: posix_timer_queue::thread_loop_body() 197: { 198: // lock on. lock off when waiting on condition 199: flx::pthread::flx_mutex_locker_t locker(lock); 200: 201: // int res; 202: 203: // pthread_cond_wait & pthread_cond_timedwait (& np rel version?) are 204: // cancellation points. doco notes for timed & untimed waits that the 205: // predicate should be rechecked as there can be spurious wakeups. 206: // no worries, when we wakeup the lock has been acquired. 207: 208: while(!PRIOQ->empty()) 209: { 210: future_evt evt = PRIOQ->top(); 211: 212: // quit request 213: if(!evt.task) return false; 214: 215: future_evt now; // "now' has no task, just a dummy. 216: get_now(&now.when); 217: 218: // if(evt < now) // would prefer <=, eh. 219: // < is arse backwards because I don't know how to use the stl 220: if(now < evt) // would prefer <=, eh. 221: { 222: // fprintf(stderr,"firing of (%li, %li) at (%li, %li)!\n", 223: // evt.when.tv_sec, evt.when.tv_nsec, 224: // now.when.tv_sec, now.when.tv_nsec); 225: evt.task->fire(); 226: PRIOQ->pop(); 227: } 228: else // we have an event in future, so sleep for that long 229: { 230: // remember that condition waits are exit points... 231: // so I don't need to test - check that. 232: // fprintf(stderr,"sleeping from %li, %li until %li, %li\n", 233: // now.when.tv_sec, now.when.tv_nsec, 234: // evt.when.tv_sec, evt.when.tv_nsec); 235: (void)sleep_cond.timedwait(&lock, &evt.when); 236: 237: // if using posix abstime timed wait we make get EINVAL here for 238: // abstimes in the past. must handle this. 239: //JS: It's handled now, waiting for a time in the past is OK 240: 241: // fprintf(stderr,"pthread_cond_timedwait woke up! (%i)\n", res); 242: } 243: } 244: 245: // if we got here then the queue is empty, so sleep indefinitely 246: // that we don't really need the mainloop testcancel because the condition 247: // wait functions are cancellation points. 248: // fprintf(stderr,"no sleep task, sleeping indefinitely\n"); 249: sleep_cond.wait(&lock); 250: // fprintf(stderr,"pthread_cond_wait woke up! (%i)\n", res); 251: 252: // lock released here 253: return true; // keep going 254: } 255: 256: 257: // in seconds from some ref pt 258: // N.B. declared in base class! 259: void 260: timer_queue::get_time(double& t) 261: { 262: timespec now; 263: get_now(&now); // just calls gettimeofday (msec) and converts 264: // to timespec (sec, nsec). could skip that 265: // and call directly, avoiding conversion 266: t = now.tv_sec + (now.tv_nsec*BIL); 267: } 268: 269: }} 270: 271: