1. demux
$Log: flx_demux.pak,v $
Revision 1.32 2006/08/04 07:07:08 skaller
get rid of debugging stuff
Revision 1.31 2006/07/24 18:22:46 skaller
Fix epoll bug which ignored epoll bug in kernels earlier
than 2.6.9 .. epoll_ctl requires a non-NULL event address
even though the argument is not used.
Revision 1.30 2006/07/19 17:57:52 rfistman
wrote self pipe for windows, not happy with it though because it
has to use a named pipe (anon pipes don't work in nonblock mode).
made demux_quitter portable and now quitting iocp demuxer cleanly
added demux callbacks which are currently used for quitting and making
demuxers responsive to new sockets.
fixed unbalanced quote in ./bin/flx script
Revision 1.29 2006/07/14 16:00:28 rfistman
generalised demux quit flag into a condition variable that is picked up and
signalled just before the event thread exits. this was yet another race
condition in which a quit and destructed demuxer was still in use by
its event thread.
made get/set quit flag virtual so that "ts" style demuxers (those that
encapsulate a non ts demuxer) report the quit flag properly.
factored kqueue quit into async_quit for making sure that waiting thread
in thread safe demuxers exits before the demuxer destructor completes.
affects ts_poll_demuxer, ts_select_demuxer, evtport_demuxer
still to do: same for iocp_demuxer.
Revision 1.28 2006/07/13 18:22:55 idadesub
fixes for freebsd. hopefully this won't break any other builds
Revision 1.27 2006/06/22 06:59:28 rfistman
added poll ts_poll demuxer.
conditionalised demuxer usage, so linux no longer implies epoll,
and solaris no longer implies evtports. this means that older
systems should fall back to select and that we may get the rare
nice surprise of finding a system that supports an unexpected
demuxer (like osx10.4 with poll) or a linux with kqueues. who knows.
ooh, case in point: cygwin supports poll. It spews warnings but
it does work.
Revision 1.26 2006/05/30 18:53:22 rfistman
fixed exception handler linkage on win32
flx_rtl_config now stops Windows.h on VS toolchain including winsock vers 1
disabled HAVE_ISBLANK in tre config file to unbreak win32 build
removed async bootstrapping from pkg_config as it's no longer needed
Revision 1.25 2006/04/30 05:56:29 rfistman
minimal change to test commit emails
Revision 1.24 2006/04/30 02:09:37 rfistman
got rid of suckitnsee kqueue config (too complicated with pthread use)
added a quitter to ts_select_demuxer to fix crash
fixed string exception in demo web server caused by POSTs
Revision 1.23 2006/04/27 00:53:03 rfistman
added demux_quitter to take down kqueue nicely. a simple quitting selfpipe
trick was not enough as the kqueue could exit before the selfpipe's callback
had finished. we now have to two ways to get attention on a (for now posix)
demuxer: selfpiper + demux_quitter. quitter will come in handy for other
demuxer whose quit boundary conditions are a little hazy.
Revision 1.22 2006/04/25 10:38:42 rfistman
added solaris libs (-lrt for semaphores, -lnsl, -lsocket) missing from some
pkgs and added -lnsl -lsocket to flx_pkgconfig bootstrap, added missing
namespace to evtport demuxer
Revision 1.21 2006/04/23 06:09:15 rfistman
fixed bug in async win32 ReadFile/WriteFile code - wasn't handling
ERROR_IO_PENDING properly. Was also not opening files with FILE_FLAG_OVERLAPPED
added actual io to windows named pipe example
Revision 1.20 2006/04/22 07:43:37 rfistman
taking down kqueue event source was blocking on osx 10.4.
fixed now, but not perfect yet. similar problems exist with other sources,
giving undefined behaviour (I believe) on takedown. watch this space.
Revision 1.19 2006/04/20 01:57:17 rfistman
factored out self_piper class. will be used in kqueue_demuxer and an
eventual epoll demuxer.
Revision 1.18 2006/04/08 08:18:34 rfistman
fixed up flx_listener, working on portable regression test for tcp
bug still at large in nocygwin flx_accept - hope it's portable!
Revision 1.17 2006/04/08 03:53:44 skaller
Detect vsnprintf.. some Windows don't provide it even though
MSDN says it is available. The MS variation when available
doesn't work correctly anyhow.
Revision 1.16 2006/03/07 04:22:36 skaller
Termination in presence of spawned pthreads should now
be working, using new ts_locker class. Exception handling
on per thread basis not implemented yet.
Revision 1.15 2006/03/06 15:08:08 skaller
Cygwin and MinGW builds.
Revision 1.14 2006/03/06 13:16:16 skaller
Fix library builds so exports and imports are properly
distinguished on a library by library basis.
Revision 1.13 2006/03/06 01:29:29 skaller
spawn_pthread: init works
Revision 1.12 2006/03/04 09:50:42 rfistman
added "suck it and see" for kqueues.
Revision 1.11 2006/03/04 04:26:41 rfistman
making demux configure more "suck it and see". temporarily disabled kqueues
whilst fixing osx 10.2 build.
Revision 1.10 2006/03/03 04:51:24 rfistman
removed the following line from the kqueues code (was breaking the mac build)
don't know what it means, but there are no threads in the kqueue code.:
using namespace flx::pthread;
Revision 1.9 2006/03/02 17:52:51 skaller
Fix flx_pkgconfig to handle linker switches
Revision 1.8 2006/03/02 02:41:39 skaller
Fixes for Win32/MSVC++ build.
Revision 1.7 2006/03/01 14:52:28 skaller
Cygwin needed one extra lib dependency.
Revision 1.6 2006/02/28 02:07:13 skaller
Refactor demux into demux + pthread.
Revision 1.5 2006/02/26 06:39:36 skaller
Fix flx_pkgconfig to conform to new spec.
Revision 1.4 2006/02/25 20:38:12 skaller
Upgrade flx_pkgconfig
Revision 1.3 2006/02/23 19:33:01 skaller
More fiddling with build system
Revision 1.2 2006/02/22 19:00:04 rfistman
oops, was passing ULONG as ULONG_PTR. Thanks to 64bit compiler for
poinnting this out.
Revision 1.1 2006/02/22 17:36:48 skaller
Rename some files.. more coming. Make RTL modules full .paks
Revision 1.54 2006/02/21 05:46:45 skaller
Fix wrong link flag tag.
Revision 1.53 2006/02/20 09:21:24 skaller
Mingw/nocygwin support
Revision 1.52 2006/02/20 02:16:44 rfistman
removed recursive locks from the threadsafe select demuxer and from
the code in general.
Revision 1.51 2006/02/19 16:04:57 skaller
Win32 build changes..seems to work now
Revision 1.50 2006/02/19 14:02:51 skaller
Windows sys libs with right switches
Revision 1.49 2006/02/18 16:30:48 skaller
More work on new package manager.
Revision 1.48 2006/02/17 09:38:31 skaller
conditionalised recursive mutex (to be removed)
fixed windows lib_requires (added lib prefix)
Revision 1.47 2006/02/16 23:39:11 rfistman
fixed cygwin hang in posix tests. cygwin wakes select with an error flag
when you shutdown a socket. other impls seem to wake select, but with no
error.
Revision 1.46 2006/02/16 15:09:45 skaller
MSVC++ package manager
Revision 1.45 2006/02/16 07:51:40 skaller
Replace pkg-config with a Felix program flx_pkgconfig.
Make sure to build it in a timely manner, since flx script
now depends on it.
Revision 1.44 2006/02/15 10:54:08 skaller
Build time packaging system.
Revision 1.43 2006/02/15 04:10:56 rfistman
working on thread safe select demuxer (ts_select_demuxer). that should
fix cygwin probs. and as yet other undiscovered problems.
Revision 1.42 2006/02/13 05:47:40 rfistman
readded recursive flag to (portable) mutexes
Revision 1.41 2006/02/12 06:51:30 rfistman
added WSAID_CONNECTEX and LPFN_CONNECTEX definitions for ming (nocygwin) target
now conditionally define EAGAIN in posix compat layer because ming seems to
have it.
Revision 1.40 2006/02/11 22:12:30 skaller
Allow assigning constructor index to enums.
Revision 1.39 2006/02/09 21:05:33 skaller
Fixed sdl to use polling.
Revision 1.38 2006/02/09 07:53:27 skaller
Fix windows semaphores.
Revision 1.37 2006/02/07 15:55:06 skaller
Added portable semaphores and a monitor class to demux.
Revision 1.36 2006/02/06 11:05:18 skaller
Timed wait on condition variable.
Revision 1.35 2006/02/06 06:50:01 skaller
Added pthread_cond_timedwait and pthread_cond_uswait functions
to condition variables. The latter is my own invention, it waits
for a specific interval in micro-seconds. The later is more
efficient on Windows when you want to wait for an interval,
since this is the native method. Otherwise you need to first
obtain the time of day, do a nasty calculation .. and then
the timedwait function will undo that, resulting in two
unnecessary and expensive system calls.
Revision 1.34 2006/02/04 11:34:36 skaller
Portable demux stuff, Win32 version
Revision 1.33 2006/02/04 10:35:55 skaller
Portable thread sync stuff
Revision 1.31 2006/02/02 18:52:35 skaller
Reorganise demux a bit
Revision 1.30 2006/01/31 04:29:17 rfistman
added demux level "sleep until" code. Not quite right yet (posix format
of current time in double isn't so good as it's measured from the 70s in
microseconds, so macroscopic sleep amounts get swamped by the magnitude
of "now"), so felix binding coming soon.
Revision 1.29 2006/01/29 07:07:21 skaller
fixed visual studio build after demux merge
Revision 1.28 2006/01/29 05:58:08 rfistman
fixed windows build (ming nocygwin) after merge
Revision 1.27 2006/01/29 02:53:16 rfistman
fixed missing epoll header for linux.:w
Revision 1.26 2006/01/29 02:17:29 rfistman
using latest demux, added epoll to flx_run (for linux). bugs fixed.
Revision 1.25 2006/01/26 10:04:28 rfistman
fixed failure to wake fthread after connects that finish immediately.
this fast connect only ever shows up on solaris, so full marks to them.
Revision 1.24 2006/01/21 23:45:10 rfistman
fixed potential leak and warning in posix_timer_queue constructor
Revision 1.23 2006/01/16 01:25:43 rfistman
factored faio posix accept and connect back demux as control blocks
removed pthread cancel from code - implicit cancel points are no longer used.
instead the threads are convinced to return from their mains via specially
formatted inputs.
Revision 1.22 2006/01/13 05:16:50 rfistman
made worker_fifo portable
Revision 1.21 2006/01/11 01:16:32 rfistman
added win_timer_queue to demux
Revision 1.20 2006/01/09 16:32:03 skaller
Integrate SDL tests, provide initial SDL event demux.
#@h=tangler('tmp/flx_demux_config.hpp')
#@select(h)
#// This is a fake flx_demux_config.h to be used at config time, before
#// the rtl proper exists. It contains just enough info to compile
#// a few of the demuxers.
##define DEMUX_EXTERN
Start cpp section to rtl/flx_demux_config.hpp[1
/1
]
1: #line 288 "./lpsrc/flx_demux.pak"
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
Start cpp section to demux/demux_demuxer.hpp[1
/1
]
1: #line 300 "./lpsrc/flx_demux.pak"
2:
3:
4:
5:
6: namespace flx { namespace demux {
7:
8: typedef struct {
9: char* buffer;
10: long buffer_size;
11: long bytes_written;
12:
13: bool finished() { return bytes_written == buffer_size; }
14: }sel_param;
15:
16:
17:
18: enum { PDEMUX_READ = 1, PDEMUX_WRITE = 2 };
19:
20:
21:
22: class DEMUX_EXTERN demux_quit_flag
23: {
24: public:
25: virtual void signal_true() = 0;
26: virtual ~demux_quit_flag() {}
27: };
28:
29:
30:
31:
32: class DEMUX_EXTERN demuxer {
33: protected:
34:
35:
36:
37:
38: virtual void get_evts(bool poll) = 0;
39:
40:
41:
42: demux_quit_flag* quit_flag;
43: public:
44: demuxer() : quit_flag(0) {}
45: virtual ~demuxer() {}
46:
47: void wait() { get_evts(false); }
48: void poll() { get_evts(true); }
49:
50:
51:
52:
53: virtual demux_quit_flag* get_quit_flag() { return quit_flag; }
54: virtual void set_quit_flag(demux_quit_flag* f) { quit_flag = f; }
55: };
56:
57:
58:
59:
60:
61: class DEMUX_EXTERN demux_callback {
62: public:
63: virtual void callback(demuxer* d) = 0;
64: virtual ~demux_callback() {}
65: };
66:
67: }}
68:
69:
Start cpp section to demux/demux_epoll_demuxer.hpp[1
/1
]
1: #line 370 "./lpsrc/flx_demux.pak"
2:
3:
4:
5:
6:
7:
8: namespace flx { namespace demux {
9:
10:
11:
12:
13:
14:
15:
16:
17: class DEMUX_EXTERN epoll_demuxer : public posix_demuxer {
18: int epoll_fd;
19:
20:
21:
22:
23: void remove_wakeup(int s);
24:
25: virtual void get_evts(bool poll);
26: public:
27: epoll_demuxer();
28: virtual ~epoll_demuxer();
29:
30: virtual int add_socket_wakeup(socket_wakeup* sv, int flags);
31: };
32:
33: }}
34:
35:
Start cpp section to demux/demux_evtport_demuxer.hpp[1
/1
]
1: #line 406 "./lpsrc/flx_demux.pak"
2:
3:
4:
5:
6:
7:
8:
9: namespace flx { namespace demux {
10:
11:
12:
13:
14:
15:
16:
17:
18:
19:
20:
21:
22:
23:
24:
25:
26:
27:
28:
29:
30: class DEMUX_EXTERN evtport_demuxer : public posix_demuxer {
31: int evtport;
32:
33:
34:
35: void remove_wakeup(int s);
36:
37: virtual void get_evts(bool poll);
38: public:
39: evtport_demuxer();
40: virtual ~evtport_demuxer();
41:
42: virtual int add_socket_wakeup(socket_wakeup* sv, int flags);
43: };
44:
45: }}
46:
47:
Start cpp section to demux/demux_iocp_demuxer.hpp[1
/1
]
1: #line 454 "./lpsrc/flx_demux.pak"
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15:
16: namespace flx { namespace demux {
17:
18:
19:
20: SOCKET DEMUX_EXTERN create_listener_socket(int* io_port, int backlog);
21:
22: SOCKET DEMUX_EXTERN nice_accept(SOCKET listener);
23: SOCKET DEMUX_EXTERN nice_connect(const char* addr, int port);
24: int DEMUX_EXTERN set_tcp_nodelay(int s, int disable_nagle);
25:
26:
27:
28:
29: class DEMUX_EXTERN winsock_initer
30: {
31: public:
32: winsock_initer();
33: ~winsock_initer();
34: };
35:
36:
37:
38:
39:
40: class DEMUX_EXTERN iocp_wakeup {
41: protected:
42: OVERLAPPED ol;
43:
44:
45: void clear_overlapped();
46: public:
47:
48:
49:
50: virtual void iocp_op_finished( DWORD nbytes, ULONG_PTR udat,
51: LPOVERLAPPED olp, int err) = 0;
52:
53:
54:
55:
56: virtual bool start_overlapped() = 0;
57:
58:
59: static iocp_wakeup* from_overlapped(LPOVERLAPPED olp);
60: };
61:
62:
63:
64: class DEMUX_EXTERN iocp_demuxer : public demuxer {
65: HANDLE iocp;
66:
67: void get_evts(bool poll);
68: public:
69: iocp_demuxer();
70: virtual ~iocp_demuxer();
71:
72:
73:
74:
75:
76:
77:
78:
79: int associate_with_iocp(HANDLE obj, ULONG_PTR udat);
80:
81: };
82:
83: }}
84:
85:
Start cpp section to demux/demux_kqueue_demuxer.hpp[1
/1
]
1: #line 540 "./lpsrc/flx_demux.pak"
2:
3:
4:
5:
6:
7: namespace flx { namespace demux {
8:
9:
10:
11:
12: class DEMUX_EXTERN kqueue_demuxer : public posix_demuxer {
13: int kq;
14: protected:
15:
16:
17:
18:
19: int add_kqueue_filter(socket_wakeup* sv, short filter);
20: int remove_kqueue_filter(int s, short filter);
21:
22: int remove_socket_wakeup(int s, int flags);
23: void get_evts(bool poll);
24: public:
25: kqueue_demuxer();
26: virtual ~kqueue_demuxer();
27:
28: virtual int add_socket_wakeup(socket_wakeup* sv, int flags);
29: };
30:
31: }}
32:
33:
Start cpp section to demux/demux_pfileio.hpp[1
/1
]
1: #line 574 "./lpsrc/flx_demux.pak"
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12: namespace flx { namespace demux {
13:
14:
15:
16:
17:
18:
19: class DEMUX_EXTERN fileio_request : public flx::pthread::worker_task
20: {
21: long offset;
22:
23: int fd;
24: bool read_flag;
25:
26: int err;
27: public:
28:
29: sel_param pb;
30:
31: virtual ~fileio_request();
32: fileio_request();
33: fileio_request(int f, char* buf, long len, long off, bool rd);
34:
35: virtual void doit();
36: };
37:
38:
39:
40:
41: class DEMUX_EXTERN pasync_fileio : public flx::pthread::worker_fifo
42: {
43: public:
44: pasync_fileio(int n,int m) : worker_fifo(n,m) {}
45:
46: void add_fileio_request(fileio_request* req) { add_worker_task(req); }
47: };
48:
49: }}
50:
51:
Start cpp section to demux/demux_posix_demuxer.hpp[1
/1
]
1: #line 626 "./lpsrc/flx_demux.pak"
2:
3:
4:
5:
6:
7:
8:
9: namespace flx { namespace demux {
10: class DEMUX_EXTERN posix_demuxer;
11:
12:
13: class DEMUX_EXTERN posix_wakeup {
14: public:
15: virtual ~posix_wakeup() {}
16:
17:
18: virtual void wakeup(posix_demuxer& demux) = 0;
19: };
20:
21: class DEMUX_EXTERN socket_wakeup : public posix_wakeup {
22: public:
23: int s;
24: int wakeup_flags;
25:
26: socket_wakeup() : s(-1) {}
27: };
28:
29: class DEMUX_EXTERN posix_demuxer : public demuxer {
30: protected:
31: void async_quit();
32:
33:
34: public:
35: virtual ~posix_demuxer();
36:
37:
38:
39:
40:
41:
42:
43:
44:
45:
46: virtual int add_socket_wakeup(socket_wakeup* sv, int flags) = 0;
47:
48:
49:
50:
51:
52: static bool socket_recv(int s, sel_param* pb);
53: static bool socket_send(int s, sel_param* pb);
54: };
55:
56:
57:
58:
59:
60:
61: class DEMUX_EXTERN accept_control_block : public socket_wakeup {
62: public:
63: int accepted;
64: int socket_err;
65:
66: accept_control_block() : accepted(-1), socket_err(0) {}
67:
68: virtual int start(posix_demuxer& demux);
69: virtual void wakeup(posix_demuxer& demux);
70: };
71:
72: class DEMUX_EXTERN connect_control_block : public socket_wakeup {
73: public:
74: int socket_err;
75:
76: const char* addy;
77: int p;
78:
79: connect_control_block() : socket_err(0) {}
80:
81: virtual int start(posix_demuxer& demux);
82: virtual void wakeup(posix_demuxer& demux);
83:
84:
85:
86:
87:
88:
89: bool finished() { return ( 0 == socket_err); }
90: };
91:
92: }}
93:
94:
Start cpp section to demux/demux_select_demuxer.hpp[1
/1
]
1: #line 721 "./lpsrc/flx_demux.pak"
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15: namespace flx { namespace demux {
16:
17: class DEMUX_EXTERN select_demuxer : public posix_demuxer {
18: void remove_fd(int s);
19:
20:
21: fd_set master_read_set;
22: fd_set master_write_set;
23: fd_set master_except_set;
24:
25:
26:
27:
28: socket_wakeup* svs[FD_SETSIZE];
29:
30:
31: int fdmax;
32:
33: protected:
34: virtual void get_evts(bool poll);
35:
36: public:
37:
38: void copy_sets(fd_set& rset, fd_set& wset, fd_set& exset);
39:
40: bool select(fd_set& rset, fd_set& wset, fd_set& exset, bool poll);
41:
42: void process_sets(fd_set& rset, fd_set& wset, fd_set& exset);
43:
44: select_demuxer();
45:
46: virtual int add_socket_wakeup(socket_wakeup* sv, int flags);
47: };
48: }}
49:
50:
Start cpp section to demux/demux_ts_select_demuxer.hpp[1
/1
]
1: #line 772 "./lpsrc/flx_demux.pak"
2:
3:
4:
5:
6:
7:
8:
9: namespace flx { namespace demux {
10:
11:
12:
13: class DEMUX_EXTERN ts_select_demuxer : public posix_demuxer {
14:
15: flx::pthread::flx_mutex_t ham_fist;
16:
17: select_demuxer demux;
18:
19:
20:
21: self_piper sp;
22: protected:
23: virtual void get_evts(bool poll);
24: public:
25: ts_select_demuxer();
26: ~ts_select_demuxer();
27:
28: virtual int add_socket_wakeup(socket_wakeup* sv, int flags);
29:
30:
31: virtual demux_quit_flag* get_quit_flag() { return demux.get_quit_flag(); }
32: virtual void set_quit_flag(demux_quit_flag* f) { demux.set_quit_flag(f); }
33: };
34: }}
35:
36:
Start cpp section to demux/demux_timer_queue.hpp[1
/1
]
1: #line 809 "./lpsrc/flx_demux.pak"
2:
3:
4:
5:
6:
7: namespace flx { namespace demux {
8:
9:
10:
11:
12:
13:
14: class DEMUX_EXTERN sleep_task
15: {
16: public:
17: virtual ~sleep_task() {}
18:
19: virtual void fire() = 0;
20: };
21:
22: class DEMUX_EXTERN timer_queue
23: {
24: public:
25: virtual ~timer_queue() {}
26:
27: virtual void add_sleep_request(sleep_task* st, double delta) = 0;
28: virtual void add_abs_sleep_request(sleep_task* st, double when) = 0;
29:
30:
31:
32: static void get_time(double& t);
33: };
34:
35: }}
36:
37:
38:
Start cpp section to demux/demux_posix_timer_queue.hpp[1
/1
]
1: #line 848 "./lpsrc/flx_demux.pak"
2:
3:
4:
5:
6:
7:
8:
9:
10:
11: namespace flx { namespace demux {
12:
13:
14: class DEMUX_EXTERN posix_timer_queue : public timer_queue
15: {
16: flx::pthread::flx_mutex_t lock;
17: flx::pthread::flx_condv_t sleep_cond;
18: flx::pthread::flx_thread_t sleep_thread;
19: void* opaque_prio_queue;
20:
21: static void* thread_start(void*);
22: bool thread_loop_body();
23:
24:
25: void wakeup_thread();
26:
27: void add_sleep_request(sleep_task* st, timespec* abs);
28: public:
29: posix_timer_queue();
30: ~posix_timer_queue();
31:
32:
33: virtual void add_sleep_request(sleep_task* st, double delta);
34:
35:
36: virtual void add_abs_sleep_request(sleep_task* st, double when);
37: };
38:
39: }}
40:
41:
42:
Start cpp section to demux/demux_posix_timer_queue.cpp[1
/1
]
1: #line 891 "./lpsrc/flx_demux.pak"
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14: namespace flx { namespace demux {
15:
16: #define MIL 1000000
17: #define BIL (MIL*1000)
18:
19: using namespace std;
20:
21:
22:
23: class future_evt
24: {
25: public:
26: timespec when;
27: sleep_task* task;
28:
29:
30: bool operator<(const future_evt& rhs) const
31: {
32: if(when.tv_sec != rhs.when.tv_sec)
33: return when.tv_sec > rhs.when.tv_sec;
34: else
35: return when.tv_nsec > rhs.when.tv_nsec;
36: }
37: };
38:
39: typedef priority_queue<future_evt> void_prio_queue;
40: #define PRIOQ ((void_prio_queue*)opaque_prio_queue)
41:
42: posix_timer_queue::posix_timer_queue()
43: {
44: opaque_prio_queue = new void_prio_queue;
45:
46:
47:
48: if(sleep_thread.init(thread_start, this))
49: fprintf(stderr, "failed to create posix timer queue thread!\n");
50: }
51:
52: posix_timer_queue::~posix_timer_queue()
53: {
54:
55:
56:
57:
58:
59:
60:
61:
62:
63:
64:
65:
66: add_sleep_request(NULL, 0.0);
67: wakeup_thread();
68:
69: sleep_thread.join();
70:
71: delete PRIOQ;
72: }
73:
74: static void
75: get_now(timespec* now)
76: {
77: struct timeval tp;
78:
79: if(gettimeofday(&tp, NULL) == -1)
80: perror("gettimeofday");
81:
82:
83: now->tv_sec = tp.tv_sec;
84: now->tv_nsec = tp.tv_usec*1000;
85:
86:
87: }
88:
89:
90:
91: #define SEC2TIMESPEC(ts, t) long wait_musec = (long)(t*MIL); \
92: timespec ts = { wait_musec / MIL, (wait_musec % MIL)*1000 }
93:
94:
95:
96: static void
97: calc_when(timespec* when, double delta)
98: {
99:
100:
101:
102:
103:
104:
105: timespec now;
106: get_now(&now);
107:
108:
109:
110:
111:
112: SEC2TIMESPEC(delay, delta);
113:
114:
115: when->tv_sec = now.tv_sec + delay.tv_sec;
116: when->tv_nsec = now.tv_nsec + delay.tv_nsec;
117:
118: if(when->tv_nsec >= BIL)
119: {
120:
121:
122: when->tv_sec++;
123: when->tv_nsec -= BIL;
124:
125:
126: }
127:
128:
129:
130:
131: }
132:
133:
134: void
135: posix_timer_queue::add_sleep_request(sleep_task* st, timespec* abs)
136: {
137: future_evt evt;
138: evt.task = st;
139: evt.when = *abs;
140:
141: flx::pthread::flx_mutex_locker_t locker(lock);
142:
143: PRIOQ->push(evt);
144:
145:
146:
147:
148: if(1 || PRIOQ->top().task == st)
149: {
150:
151: wakeup_thread();
152: }
153: }
154:
155:
156:
157: void
158: posix_timer_queue::add_sleep_request(sleep_task* st, double delta)
159: {
160:
161: timespec when;
162: calc_when(&when, delta);
163:
164: add_sleep_request(st, &when);
165: }
166:
167: void
168: posix_timer_queue::add_abs_sleep_request(sleep_task* st, double when)
169: {
170:
171: SEC2TIMESPEC(abs_time, when);
172: add_sleep_request(st, &abs_time);
173: }
174:
175:
176:
177: void
178: posix_timer_queue::wakeup_thread()
179: {
180: sleep_cond.signal();
181: }
182:
183: void*
184: posix_timer_queue::thread_start(void* udat)
185: {
186: posix_timer_queue* q = (posix_timer_queue*)udat;
187:
188:
189: while(q->thread_loop_body()) ;
190:
191: return 0;
192: }
193:
194: bool
195: posix_timer_queue::thread_loop_body()
196: {
197:
198: flx::pthread::flx_mutex_locker_t locker(lock);
199:
200: int res;
201:
202:
203:
204:
205:
206:
207: while(!PRIOQ->empty())
208: {
209: future_evt evt = PRIOQ->top();
210:
211:
212: if(!evt.task) return false;
213:
214: future_evt now;
215: get_now(&now.when);
216:
217:
218:
219: if(now < evt)
220: {
221:
222:
223:
224: evt.task->fire();
225: PRIOQ->pop();
226: }
227: else
228: {
229:
230:
231:
232:
233:
234: (void)sleep_cond.timedwait(&lock, &evt.when);
235:
236:
237:
238:
239:
240:
241: }
242: }
243:
244:
245:
246:
247:
248: sleep_cond.wait(&lock);
249:
250:
251:
252: return true;
253: }
254:
255:
256:
257:
258: void
259: timer_queue::get_time(double& t)
260: {
261: timespec now;
262: get_now(&now);
263:
264:
265: t = now.tv_sec + (now.tv_nsec*BIL);
266: }
267:
268: }}
269:
Start cpp section to demux/demux_win_timer_queue.hpp[1
/1
]
1: #line 1161 "./lpsrc/flx_demux.pak"
2:
3:
4:
5:
6:
7:
8:
9:
10: namespace flx { namespace demux {
11:
12: class DEMUX_EXTERN win_timer_queue : public timer_queue
13: {
14: HANDLE timer_queue;
15:
16: static VOID CALLBACK timer_callback(PVOID, BOOLEAN);
17: public:
18: win_timer_queue();
19: ~win_timer_queue();
20:
21: virtual void add_sleep_request(sleep_task* st, double delta);
22: virtual void add_abs_sleep_request(sleep_task* st, double when);
23:
24: };
25:
26: }}
27:
28:
29:
Start cpp section to demux/demux_win_timer_queue.cpp[1
/1
]
1: #line 1191 "./lpsrc/flx_demux.pak"
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13: namespace flx { namespace demux {
14:
15: #define MIL 1000000
16:
17: typedef struct
18: {
19: sleep_task* st;
20: HANDLE timer;
21: HANDLE timer_queue;
22: } timer_cookie;
23:
24: win_timer_queue::win_timer_queue()
25: {
26:
27:
28: timer_queue = CreateTimerQueue();
29: if(!timer_queue)
30: {
31: fprintf(stderr, "CreateTimerQueue failed: %i\n", GetLastError());
32: throw -1;
33: }
34:
35: }
36:
37:
38: win_timer_queue::~win_timer_queue()
39: {
40:
41:
42:
43:
44:
45:
46:
47:
48:
49: if(!DeleteTimerQueueEx(timer_queue, INVALID_HANDLE_VALUE))
50: {
51: fprintf(stderr, "DeleteTimerQueueEx failed: %i\n", GetLastError());
52:
53: }
54:
55: }
56:
57:
58:
59: void
60: win_timer_queue::add_sleep_request(sleep_task* st, double delta)
61: {
62:
63:
64: timer_cookie* tc = new timer_cookie;
65:
66:
67: tc->st = st;
68: tc->timer_queue = timer_queue;
69:
70:
71:
72:
73: if(!CreateTimerQueueTimer(
74: &tc->timer,
75: timer_queue,
76:
77: timer_callback,
78: tc,
79: (DWORD)(delta*1000),
80: 0,
81: WT_EXECUTEINTIMERTHREAD))
82: {
83: fprintf(stderr, "CreateTimerQueueTimer failed: %i\n", GetLastError());
84: delete tc;
85: return;
86: }
87: }
88:
89:
90:
91: VOID CALLBACK
92: win_timer_queue::timer_callback(PVOID udat, BOOLEAN timer_or_wait_fired)
93: {
94: timer_cookie* tc = (timer_cookie*)udat;
95:
96:
97:
98:
99: if(!tc)
100: {
101:
102: fprintf(stderr, "WHOA - NULL queue cookie! (fired: %i)\n",
103: timer_or_wait_fired);
104: return;
105: }
106:
107:
108:
109:
110:
111: tc->st->fire();
112:
113:
114:
115: if(!DeleteTimerQueueTimer(tc->timer_queue, tc->timer, NULL))
116: {
117: int err = GetLastError();
118:
119: if( ERROR_IO_PENDING != err)
120: {
121: fprintf(stderr, "DeleteTimerQueueTimer of %p failed: %i\n",
122: tc->timer, err);
123: }
124: else
125: {
126:
127: fprintf(stderr, "DeleteTimerQueueTimer = ERROR_IO_PENDING\n");
128: fprintf(stderr, "Apparently this is ok...\n");
129: }
130: }
131: delete tc;
132:
133:
134: }
135:
136:
137:
138: void
139: timer_queue::get_time(double& t)
140: {
141: SYSTEMTIME sysnow;
142: GetSystemTime(&sysnow);
143:
144:
145:
146:
147:
148: FILETIME fnow;
149: if(!SystemTimeToFileTime(&sysnow, &fnow))
150: {
151: fprintf(stderr, "SystemTimeToFileTime failed: %i\n", GetLastError());
152: t = 0;
153: return;
154: }
155:
156: ULARGE_INTEGER now;
157:
158: assert(sizeof(now) == sizeof(fnow));
159: memcpy(&now, &fnow, sizeof(now));
160:
161:
162:
163: t = now.QuadPart*MIL*10;
164: }
165:
166: void
167: win_timer_queue::add_abs_sleep_request(sleep_task* st, double when)
168: {
169:
170: double now;
171: get_time(now);
172: double delta = when-now;
173: if(delta < 0.0) delta = 0.0;
174: add_sleep_request(st, delta);
175: }
176:
177: }}
Start cpp section to demux/demux_demuxer.cpp[1
/1
]
1: #line 1369 "./lpsrc/flx_demux.pak"
2:
3:
4:
5:
Start cpp section to demux/demux_epoll_demuxer.cpp[1
/1
]
1: #line 1375 "./lpsrc/flx_demux.pak"
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15:
16:
17:
18:
19:
20:
21:
22: namespace flx { namespace demux {
23:
24: epoll_demuxer::epoll_demuxer()
25: : epoll_fd(-1)
26: {
27:
28:
29:
30:
31:
32:
33:
34:
35: epoll_fd = epoll_create(1);
36: if(-1 == epoll_fd)
37: {
38: perror("epoll_create");
39: throw -1;
40: }
41: }
42:
43: epoll_demuxer::~epoll_demuxer()
44: {
45: async_quit();
46:
47: if(-1 != epoll_fd)
48: {
49: if(close(epoll_fd) != 0)
50: perror("epoll close");
51: }
52: }
53:
54: int
55: epoll_demuxer::add_socket_wakeup(socket_wakeup* sv, int flags)
56: {
57: int s = sv->s;
58:
59: struct epoll_event evt;
60:
61:
62:
63:
64:
65:
66:
67: evt.events = 0;
68:
69: if(flags & PDEMUX_READ) evt.events |= EPOLLIN;
70: if(flags & PDEMUX_WRITE) evt.events |= EPOLLOUT;
71:
72:
73:
74:
75:
76:
77:
78:
79:
80:
81:
82:
83: evt.events |= (EPOLLHUP | EPOLLERR);
84:
85: evt.data.ptr = sv;
86:
87: if(epoll_ctl(epoll_fd, EPOLL_CTL_ADD, s, &evt) == -1)
88: {
89:
90:
91:
92:
93:
94:
95: #if 0
96: int err = errno;
97:
98: if(EEXIST == err)
99: {
100:
101: fprintf(stderr, "RETRYING WITH EPOLL_CTL_MOD\n");
102: if(epoll_ctl(epoll_fd, EPOLL_CTL_MOD, s, &evt) != -1)
103: return 0;
104: }
105: #endif
106: perror("epoll_ctl (add)");
107:
108: return -1;
109: }
110: return 0;
111: }
112:
113:
114:
115: void
116: epoll_demuxer::remove_wakeup(int s)
117: {
118:
119:
120: struct epoll_event evt;
121:
122:
123:
124:
125: if(epoll_ctl(epoll_fd, EPOLL_CTL_DEL, s, &evt) == -1)
126: {
127:
128:
129:
130: perror("epoll_ctl (remove)");
131: }
132: }
133:
134: void
135: epoll_demuxer::get_evts(bool poll)
136: {
137: struct epoll_event evt;
138:
139: switch(epoll_wait(epoll_fd, &evt, 1, (poll) ? 0 : ~0))
140: {
141: case -1:
142: perror("epoll_wait");
143:
144: case 0:
145: return;
146: }
147:
148: socket_wakeup* sv = (socket_wakeup*)evt.data.ptr;
149:
150:
151:
152:
153:
154:
155:
156: sv->wakeup_flags = 0;
157:
158: bool wake = false;
159:
160:
161:
162: if(evt.events & EPOLLIN)
163: {
164:
165: sv->wakeup_flags |= PDEMUX_READ;
166: wake = true;
167: }
168:
169: if(evt.events & EPOLLOUT)
170: {
171:
172: sv->wakeup_flags |= PDEMUX_WRITE;
173: wake = true;
174: }
175:
176:
177:
178:
179:
180:
181:
182: if(evt.events & EPOLLHUP)
183: {
184: fprintf(stderr, "EPOLLHUP for %p->%i\n", sv, sv->s);
185: wake = true;
186: }
187:
188: if(evt.events & EPOLLERR)
189: {
190:
191:
192:
193:
194:
195: fprintf(stderr,"epoll error, waking: %i (errno?)\n", sv->s);
196:
197: wake = true;
198: }
199:
200: if((evt.events & ~(EPOLLERR|EPOLLIN|EPOLLOUT|EPOLLHUP)))
201: {
202: fprintf(stderr,"unknown events in epoll_demuxer %x\n", evt.events);
203: }
204:
205:
206:
207: if(wake)
208: {
209:
210:
211:
212: remove_wakeup(sv->s);
213:
214: sv->wakeup(*this);
215: }
216: }
217: }}
218:
Start cpp section to demux/demux_evtport_demuxer.cpp[1
/1
]
1: #line 1594 "./lpsrc/flx_demux.pak"
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15: namespace flx { namespace demux {
16:
17:
18:
19:
20:
21: evtport_demuxer::evtport_demuxer()
22: {
23: if((evtport = port_create()) < 0)
24: {
25: perror("port_create");
26: throw -1;
27: }
28: }
29:
30: evtport_demuxer::~evtport_demuxer()
31: {
32: async_quit();
33:
34: if(-1 != evtport)
35: {
36: if(close(evtport) != 0)
37: perror("evtport close");
38: }
39: }
40:
41: int
42: evtport_demuxer::add_socket_wakeup(socket_wakeup* sv, int flags)
43: {
44: if(flags & ~(PDEMUX_READ|PDEMUX_WRITE))
45: return -1;
46:
47: int events = 0;
48: int s = sv->s;
49:
50: if(flags & PDEMUX_READ) events |= POLLIN;
51: if(flags & PDEMUX_WRITE) events |= POLLOUT;
52:
53:
54: events |= (POLLHUP | POLLERR);
55:
56:
57:
58:
59:
60:
61: if(port_associate(evtport, PORT_SOURCE_FD, (uintptr_t)s, events, sv) == -1)
62: {
63: perror("add socket_wakeup/port_associate");
64: return -1;
65: }
66:
67: return 0;
68: }
69:
70:
71:
72:
73:
74:
75:
76:
77: void
78: evtport_demuxer::remove_wakeup(int s)
79: {
80: if(port_dissociate(evtport, PORT_SOURCE_FD, s) == -1)
81: perror("reading port_dissociate");
82: }
83:
84: #define POLLPR(ev) if(e->portev_events & ev) fprintf(stderr,#ev", ")
85:
86: static void
87: print_port_evt(port_event_t* e)
88: {
89: char* srcstr[PORT_SOURCE_ALERT-PORT_SOURCE_AIO+1]
90: = { "ALERT", "TIMER", "USER", "FD", "AIO"};
91: fprintf(stderr,"e: %p\n\t", e);
92:
93: fprintf(stderr,"portev_events: ");
94:
95:
96: POLLPR(POLLIN); POLLPR(POLLOUT); POLLPR(POLLPRI);
97: POLLPR(POLLRDNORM); POLLPR(POLLRDBAND); POLLPR(POLLWRBAND);
98:
99:
100:
101: POLLPR(POLLERR); POLLPR(POLLHUP); POLLPR(POLLNVAL); POLLPR(POLLREMOVE);
102:
103: fprintf(stderr," (%x)\n\t", e->portev_events);
104:
105: int src = e->portev_source;
106: if(PORT_SOURCE_AIO <= src && src <= PORT_SOURCE_ALERT)
107: {
108: fprintf(stderr,"portev_source: PORT_SOURCE_%s (%x)\n\t",
109: srcstr[src-PORT_SOURCE_AIO], src);
110: }
111: else
112: {
113: fprintf(stderr,"portev_source: %x\n\t", e->portev_source);
114: }
115:
116: fprintf(stderr,"portev_pad: %x\n\t", e->portev_pad);
117:
118: fprintf(stderr,"portev_object: %x\n\t", e->portev_object);
119: fprintf(stderr,"portev_user: %p\n", e->portev_user);
120: }
121:
122: void
123: evtport_demuxer::get_evts(bool poll)
124: {
125:
126:
127:
128: port_event_t evt;
129: timespec timeout, *tp = NULL;
130:
131: if(poll)
132: {
133: timeout.tv_sec = 0;
134: timeout.tv_nsec = 0;
135: tp = &timeout;
136: }
137:
138:
139: if(port_get(evtport, &evt, tp) == -1)
140: {
141: perror("port_get");
142: return;
143: }
144:
145:
146:
147:
148: socket_wakeup* sv = (socket_wakeup*)evt.portev_user;
149: int s = evt.portev_object;
150:
151: assert(sv != NULL);
152: if(evt.portev_source != PORT_SOURCE_FD)
153: {
154:
155:
156:
157:
158:
159:
160:
161: return;
162: }
163:
164:
165:
166: sv->wakeup_flags = 0;
167:
168: if(evt.portev_events & POLLERR)
169: {
170: fprintf(stderr,"ERRORS on s = %i, sv = %p\n", s, sv);
171:
172:
173: }
174:
175:
176:
177:
178:
179:
180: if(evt.portev_events & POLLIN)
181: {
182:
183: sv->wakeup_flags |= PDEMUX_READ;
184: }
185:
186: if(evt.portev_events & POLLOUT)
187: {
188:
189: sv->wakeup_flags |= PDEMUX_WRITE;
190: }
191:
192:
193: if(evt.portev_events & ~(POLLIN | POLLOUT | POLLERR))
194: {
195: fprintf(stderr,"UNSOLICITED events in evtport_demuxer (%x)\n",
196: evt.portev_events);
197: }
198:
199: assert(sv->wakeup_flags != 0);
200:
201: if(sv->wakeup_flags)
202: sv->wakeup(*this);
203: }
204: }}
205:
Start cpp section to demux/demux_iocp_demuxer.cpp[1
/1
]
1: #line 1800 "./lpsrc/flx_demux.pak"
2:
3:
4:
5:
6:
7:
8:
9:
10: namespace flx { namespace demux {
11:
12:
13:
14:
15:
16:
17:
18:
19:
20:
21:
22:
23:
24:
25:
26:
27:
28:
29:
30:
31:
32:
33:
34:
35:
36:
37:
38:
39:
40:
41:
42:
43:
44:
45:
46: winsock_initer::winsock_initer()
47: {
48: WSADATA wsaData;
49:
50:
51:
52:
53:
54: int res= WSAStartup(MAKEWORD(2, 2), &wsaData);
55: if(res!= 0)
56: {
57:
58: fprintf(stderr,"couldn't find usable winsock dll: %i\n", res);
59: throw res;
60: }
61: }
62:
63: winsock_initer::~winsock_initer()
64: {
65: if(WSACleanup() != 0)
66: {
67: fprintf(stderr,"WSACleanup failed %i\n", WSAGetLastError());
68: }
69: }
70:
71:
72:
73: iocp_wakeup*
74: iocp_wakeup::from_overlapped(LPOVERLAPPED olp)
75: {
76:
77:
78: return (iocp_wakeup*)((char*)olp-offsetof(iocp_wakeup, ol));
79: }
80:
81: void
82: iocp_wakeup::clear_overlapped()
83: {
84: ZeroMemory(&ol, sizeof(ol));
85: }
86:
87:
88: iocp_demuxer::iocp_demuxer()
89: : iocp(NULL)
90: {
91:
92:
93:
94:
95:
96:
97:
98:
99:
100: fprintf(stderr,"CreateIoCompletionPort with ONE WORKER THREAD\n");
101: iocp = CreateIoCompletionPort(
102: INVALID_HANDLE_VALUE,
103: NULL,
104: (ULONG_PTR)0,
105: 1
106: );
107:
108: if(NULL == iocp)
109: {
110: DWORD err = GetLastError();
111: fprintf(stderr,"failed to create completion port: %li\n", err);
112: throw -1;
113: }
114: }
115:
116: iocp_demuxer::~iocp_demuxer()
117: {
118: fprintf(stderr, "~iocp (%p) NOW WITH ASYNC QUITTER!\n", iocp);
119: try
120: {
121: demux_quitter q;
122: q.quit(this);
123: }
124: catch(...)
125: {
126: fprintf(stderr, "~iocp_demuxer async quit threw exception!\n");
127:
128: }
129:
130:
131: if(NULL != iocp && !CloseHandle(iocp))
132: {
133: DWORD err = GetLastError();
134: fprintf(stderr,"failed cleanup iocp: %li\n", err);
135: }
136: }
137:
138: int
139: iocp_demuxer::associate_with_iocp(HANDLE obj, ULONG_PTR udat)
140: {
141:
142:
143:
144:
145:
146:
147: if(CreateIoCompletionPort(obj, iocp, udat, 0) == NULL) {
148:
149:
150: fprintf(stderr,"CreateIoCompletionPort failed to register object: %li\n",
151: GetLastError());
152: return -1;
153: }
154:
155: return 0;
156: }
157:
158: void
159: iocp_demuxer::get_evts(bool poll) {
160:
161:
162:
163:
164:
165:
166:
167:
168:
169:
170: DWORD nbytes;
171: ULONG_PTR udat;
172: LPOVERLAPPED olp;
173:
174:
175:
176:
177:
178: int err = NO_ERROR;
179:
180:
181:
182: if(!GetQueuedCompletionStatus(iocp, &nbytes, &udat, &olp,
183: (poll) ? 0: INFINITE))
184: {
185:
186:
187:
188:
189: err = GetLastError();
190:
191:
192:
193:
194:
195: if(WAIT_TIMEOUT == err)
196: {
197:
198:
199: return;
200: }
201: else if(ERROR_OPERATION_ABORTED == err)
202: {
203:
204:
205: fprintf(stderr, "WHOA!!! - disassociate before killing handle\n");
206: return;
207: }
208: else
209: {
210: fprintf(stderr,"GetQueuedCompletionStatus returned false: %i\n",
211: err);
212:
213: }
214:
215:
216:
217:
218: }
219:
220:
221:
222:
223:
224:
225:
226:
227:
228:
229:
230:
231:
232: assert( olp );
233:
234:
235: iocp_wakeup* wakeup = iocp_wakeup::from_overlapped(olp);
236:
237:
238: wakeup->iocp_op_finished(nbytes, udat, olp, err);
239: }
240:
241:
242:
243:
244:
245:
246: SOCKET
247: create_listener_socket(int* io_port, int backlog)
248: {
249: fprintf(stderr,"creating_listener_socket\n");
250: SOCKET listener;
251:
252:
253:
254:
255: listener = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
256:
257: if (INVALID_SOCKET == listener)
258: {
259: fprintf(stderr,"listener create failed: %i\n", WSAGetLastError());
260: return INVALID_SOCKET;
261: }
262:
263: SOCKADDR_IN addr;
264:
265:
266: ZeroMemory(&addr, sizeof(addr));
267: addr.sin_family = AF_INET;
268: addr.sin_addr.s_addr = htonl(INADDR_ANY);
269: addr.sin_port = htons(*io_port);
270:
271:
272: int res;
273: res = bind(listener, (LPSOCKADDR)&addr, sizeof(addr));
274:
275: if (SOCKET_ERROR == res)
276: {
277: fprintf(stderr,"bind() failed %i\n", WSAGetLastError());
278: if(closesocket(listener) == SOCKET_ERROR)
279: {
280: fprintf(stderr,"closesocket failed on listener: %i\n",
281: WSAGetLastError());
282: }
283: return INVALID_SOCKET;
284: }
285:
286:
287: if(0 == *io_port)
288: {
289: int namelen;
290:
291: if (getsockname(listener, (struct sockaddr *)&addr, &namelen)
292: == SOCKET_ERROR)
293: {
294: fprintf(stderr, "getsockname failed (%i)\n", WSAGetLastError());
295:
296: if(closesocket(listener) == SOCKET_ERROR)
297: {
298: fprintf(stderr,"closesocket failed on listener: %i\n",
299: WSAGetLastError());
300: }
301: return INVALID_SOCKET;
302: }
303:
304: *io_port = ntohs(addr.sin_port);
305: }
306:
307:
308: res = listen(listener, backlog);
309: if (SOCKET_ERROR == res)
310: {
311: fprintf(stderr,"listen() failed %i\n", WSAGetLastError());
312:
313: if(closesocket(listener) == SOCKET_ERROR)
314: {
315: fprintf(stderr,"closesocket failed on listener: %i\n",
316: WSAGetLastError());
317: }
318:
319: return INVALID_SOCKET;
320: }
321:
322: return listener;
323: }
324:
325:
326:
327:
328:
329:
330:
331:
332:
333:
334:
335:
336:
337: SOCKET
338: nice_accept(SOCKET listener)
339: {
340: struct sockaddr_in remoteaddr;
341: int addrlen = sizeof(remoteaddr);
342: SOCKET s;
343:
344:
345: s = accept(listener, (struct sockaddr*)&remoteaddr, &addrlen);
346:
347: if(INVALID_SOCKET == s)
348: {
349: fprintf(stderr,"nice_accept failed (%i)\n", WSAGetLastError());
350: }
351:
352:
353:
354:
355: return s;
356: }
357:
358:
359: static int
360: connect_sock(SOCKET s, const char* addr, int port)
361: {
362: struct sockaddr_in sock_addr;
363:
364: memset(&sock_addr, 0, sizeof(sock_addr));
365: sock_addr.sin_family = AF_INET;
366: sock_addr.sin_addr.s_addr = inet_addr(addr);
367: sock_addr.sin_port = htons(port);
368:
369: return connect(s, (struct sockaddr *)&sock_addr, sizeof(sock_addr));
370: }
371:
372:
373:
374: SOCKET
375: nice_connect(const char* addr, int port)
376: {
377: SOCKET s;
378:
379: if((s = socket(AF_INET, SOCK_STREAM, 0)) != INVALID_SOCKET
380: && connect_sock(s, addr, port) != SOCKET_ERROR)
381: {
382: return s; /* success! */
383: }
384:
385: /* something happened (not as good as catch 22) */
386: fprintf(stderr,"nice_connect failed (%i)\n", WSAGetLastError());
387:
388: if(INVALID_SOCKET != s && closesocket(s) == SOCKET_ERROR)
389: fprintf(stderr,"nice close failed (%i)\n", WSAGetLastError());
390:
391: return INVALID_SOCKET;
392: }
393:
394:
395:
396: int
397: set_tcp_nodelay(int s, int disable)
398: {
399: BOOL disable_nagle = (disable) ? true : false;
400:
401: int res = setsockopt(s, IPPROTO_TCP, TCP_NODELAY,
402: (const char*)&disable_nagle, sizeof(disable_nagle));
403:
404: return (res == SOCKET_ERROR) ? -1 : 0;
405: }
406:
407: }}
408:
Start cpp section to demux/demux_overlapped.cpp[1
/1
]
1: #line 2209 "./lpsrc/flx_demux.pak"
2:
3:
4:
5:
6:
7:
8: typedef
9: BOOL
10: (PASCAL FAR * LPFN_CONNECTEX) (
11: IN SOCKET s,
12: IN const struct sockaddr FAR *name,
13: IN int namelen,
14: IN PVOID lpSendBuffer OPTIONAL,
15: IN DWORD dwSendDataLength,
16: OUT LPDWORD lpdwBytesSent,
17: IN LPOVERLAPPED lpOverlapped
18: );
19:
20:
21: {0x25a207b9,0xddf3,0x4660,{0x8e,0xe9,0x76,0xe5,0x8c,0x74,0x06,0x3e}}
22:
23:
24: namespace flx { namespace demux {
25:
26:
27:
28:
29:
30:
31:
32: bool
33: acceptex_control_block::start_overlapped()
34: {
35: clear_overlapped();
36:
37:
38:
39:
40:
41:
42:
43: DWORD nbytes = 0;
44: BOOL success;
45:
46:
47:
48:
49: success = AcceptEx(listener, acceptor,
50: accept_buf,
51: 0,
52: ACCEPTEX_ADDR_SIZE,
53: ACCEPTEX_ADDR_SIZE,
54: &nbytes,
55: &ol);
56:
57:
58: if(success)
59: {
60:
61: fprintf(stderr,"WHOA! AcceptEx RETURNED SUCCESS IMMEDIATELY!\n");
62: fprintf(stderr, "This could be bad, as wait should call op_finish\n");
63: fprintf(stderr, "We also lose the udat cookie (set to NULL)\n");
64:
65:
66:
67:
68: iocp_op_finished(nbytes, 0, &ol, NO_ERROR);
69: return false;
70: }
71: else
72: {
73: int err = WSAGetLastError();
74:
75: if(ERROR_IO_PENDING == err)
76: {
77:
78:
79:
80: }
81: else
82: {
83: fprintf(stderr,"AcceptEx failed: %i\n", err);
84: fprintf(stderr,"returning true should wake thread to detect failure.\n");
85: return true;
86: }
87: }
88: return false;
89:
90: }
91:
92:
93: #if 0
94:
95:
96:
97:
98:
99:
100:
101: #endif
102:
103:
104:
105: static int
106: GetConnectExAddr(SOCKET s, LPFN_CONNECTEX* conn_fn)
107: {
108: *conn_fn = NULL;
109: GUID GuidConnectEx = WSAID_CONNECTEX;
110: DWORD dwBytes;
111: int err;
112:
113: err = WSAIoctl(s,
114: SIO_GET_EXTENSION_FUNCTION_POINTER,
115: &GuidConnectEx,
116: sizeof(GuidConnectEx),
117: conn_fn,
118: sizeof(*conn_fn),
119: &dwBytes,
120: NULL, NULL);
121:
122: return err;
123: }
124:
125:
126:
127:
128: static int
129: bind_socket(SOCKET s)
130: {
131: SOCKADDR_IN addr;
132:
133: ZeroMemory(&addr, sizeof(addr));
134: addr.sin_family = AF_INET;
135: addr.sin_addr.s_addr = htonl(INADDR_ANY);
136: addr.sin_port = htons(0);
137:
138: return bind(s, (LPSOCKADDR)&addr, sizeof(addr));
139: }
140:
141: bool
142: connectex_control_block::start_overlapped()
143: {
144: clear_overlapped();
145:
146:
147: socket_err = ERROR_IO_PENDING;
148:
149:
150: DWORD bytes_sent = 0;
151: BOOL success;
152:
153: LPFN_CONNECTEX pfConnectEx;
154:
155:
156:
157:
158:
159:
160:
161:
162: if(GetConnectExAddr(s, &pfConnectEx) == SOCKET_ERROR)
163: {
164: fprintf(stderr,"GetConnectExAddr failed: %i\n", WSAGetLastError());
165: return true;
166: }
167:
168:
169:
170:
171:
172:
173: if(bind_socket(s) == SOCKET_ERROR)
174: fprintf(stderr,"ConnectEx bind failed: %i\n", WSAGetLastError());
175:
176:
177:
178: SOCKADDR_IN addr;
179:
180:
181: ZeroMemory(&addr, sizeof(addr));
182: addr.sin_family = AF_INET;
183: addr.sin_addr.s_addr = inet_addr(addy);
184: addr.sin_port = htons(p);
185:
186:
187:
188:
189:
190:
191:
192: success = (*pfConnectEx)(s,
193: (LPSOCKADDR)&addr,
194: sizeof(addr),
195: NULL,
196: 0,
197: NULL,
198: &ol);
199:
200:
201:
202:
203: if(success)
204: {
205: fprintf(stderr,"WHOA! ConnectEx RETURNED SUCCESS IMMEDIATELY!\n");
206: fprintf(stderr, "BAD: calls op_finish and loses udat cookie\n");
207:
208: iocp_op_finished(bytes_sent, 0, &ol, NO_ERROR);
209: return false;
210: }
211: else
212: {
213: int err = WSAGetLastError();
214:
215: if(ERROR_IO_PENDING == err)
216: {
217:
218:
219:
220: }
221: else
222: {
223:
224:
225: fprintf(stderr,"ConnectEx failed: %i\n", err);
226: return true;
227: }
228: }
229: return false;
230: }
231:
232:
233:
234: bool
235: transmitfile_control_block::start_overlapped()
236: {
237: clear_overlapped();
238:
239:
240:
241:
242:
243:
244:
245:
246:
247: if(TransmitFile(s, file, 0, 0, &ol, NULL, flags))
248: {
249: fprintf(stderr,"Transmit file succeeded immediately! waking...\n");
250: return true;
251: }
252: else
253: {
254: DWORD err = WSAGetLastError();
255:
256:
257:
258: if(ERROR_IO_PENDING != err && WSA_IO_PENDING != err)
259: fprintf(stderr,"genuine error from TransmitFile: %li\n", err);
260: }
261: return false;
262: }
263:
264:
265:
266:
267:
268: wsasocketio_control_block::wsasocketio_control_block(SOCKET src, sel_param* pb,
269: bool inread)
270: : s(src), ppb(pb), reading(inread)
271: {
272: }
273:
274: bool
275: wsasocketio_control_block::start_overlapped()
276: {
277: clear_overlapped();
278:
279: error = 0;
280:
281:
282: DWORD imm_bytes;
283: int recv_res;
284:
285:
286: wbufs[0].len = ppb->buffer_size - ppb->bytes_written;
287: wbufs[0].buf = ppb->buffer + ppb->bytes_written;
288:
289:
290:
291:
292:
293:
294:
295:
296:
297:
298:
299:
300:
301:
302:
303: DWORD flags = MSG_PARTIAL;
304:
305:
306: if(reading)
307: recv_res = WSARecv(s, wbufs, NUM_WBUFS, &imm_bytes, &flags, &ol, NULL);
308: else
309: recv_res = WSASend(s, wbufs, NUM_WBUFS, &imm_bytes, flags, &ol, NULL);
310:
311:
312:
313: switch(recv_res)
314: {
315: case 0:
316: {
317:
318:
319:
320:
321:
322:
323:
324:
325:
326:
327:
328:
329:
330:
331:
332:
333:
334: return false;
335: }
336: break;
337: case SOCKET_ERROR:
338: {
339: DWORD err = WSAGetLastError();
340:
341:
342:
343: if(ERROR_IO_PENDING == err || WSA_IO_PENDING == err)
344: {
345:
346:
347: return false;
348: }
349:
350: fprintf(stderr,"WSARecv/Send returned SOCKET_ERR: %li\n", err);
351: return true;
352: }
353: break;
354: default:
355: {
356: fprintf(stderr,"WSARecv/Send returned other error: %i, GetLastError: %li\n",
357: recv_res, GetLastError());
358: return true;
359: }
360: break;
361: }
362:
363: return false;
364: }
365:
366:
367:
368: void
369: wsasocketio_control_block::iocp_op_finished(DWORD nbytes, ULONG_PTR udat,
370: LPOVERLAPPED olp, int err)
371: {
372: error = err;
373:
374:
375:
376:
377: if(err)
378: {
379: fprintf(stderr, "wsasocketio, got error: %i\n", err);
380: }
381:
382:
383: assert( !ppb->finished() );
384:
385: ppb->bytes_written += nbytes;
386:
387: if(0 == nbytes)
388: {
389: fprintf(stderr, "wsaiosocket got zero bytes: err=%i, read=%i\n",
390: err, reading);
391: }
392:
393:
394:
395:
396:
397:
398: if(0 == nbytes || ppb->finished())
399: {
400: return;
401: }
402: else
403: {
404:
405: fprintf(stderr,"didn\'t get everything (%li of %li bytes)\n",
406: ppb->bytes_written, ppb->buffer_size);
407: if(start_overlapped())
408: {
409: fprintf(stderr, "UM, socket restart finished immediately\n");
410: fprintf(stderr, "causes new wakeup? or should I loop around?\n");
411: }
412: }
413: }
414:
415:
416:
417:
418: winfileio_control_block::winfileio_control_block(HANDLE f, void* buf, int len,
419: bool inread)
420: : reading(inread), file(f)
421: {
422:
423:
424: pb.buffer = (char*)buf;
425: pb.buffer_size = len;
426: pb.bytes_written = 0;
427: }
428:
429:
430:
431:
432: bool
433: winfileio_control_block::start_overlapped()
434: {
435:
436: clear_overlapped();
437:
438:
439: BOOL success;
440:
441:
442: if(reading)
443:
444: success = ReadFile(file, pb.buffer, pb.buffer_size, NULL, &ol);
445: else
446:
447: success = WriteFile(file, pb.buffer, pb.buffer_size, NULL, &ol);
448:
449:
450:
451: if(!success)
452: {
453: int err = GetLastError();
454:
455:
456: if(ERROR_IO_PENDING == err)
457: {
458: return false;
459: }
460: else
461: {
462: fprintf(stderr,"%sFile failed! (%li)\n", (reading) ? "Read" : "Write",
463: err);
464: return true;
465: }
466:
467:
468:
469:
470:
471: }
472:
473: return false;
474: }
475:
476: }}
477:
Start cpp section to demux/demux_overlapped.hpp[1
/1
]
1: #line 2687 "./lpsrc/flx_demux.pak"
2:
3:
4:
5:
6:
7:
8:
9:
10:
11: namespace flx { namespace demux {
12:
13:
14:
15:
16:
17:
18:
19:
20:
21:
22:
23:
24: class DEMUX_EXTERN acceptex_control_block : public iocp_wakeup {
25: enum { ACCEPTEX_ADDR_SIZE = sizeof(SOCKADDR_IN) + 16 };
26:
27: public:
28: SOCKET listener, acceptor;
29:
30: char accept_buf[2*ACCEPTEX_ADDR_SIZE];
31:
32: virtual bool start_overlapped();
33:
34: acceptex_control_block()
35: : listener(INVALID_SOCKET), acceptor(INVALID_SOCKET) {}
36: };
37:
38: class DEMUX_EXTERN connectex_control_block : public iocp_wakeup
39: {
40: public:
41:
42: int socket_err;
43:
44:
45: SOCKET s;
46: const char* addy;
47: int p;
48:
49:
50: connectex_control_block() : s(INVALID_SOCKET), addy(0), p(0) {}
51:
52:
53:
54: bool finished() { return ERROR_IO_PENDING != socket_err; }
55:
56: virtual bool start_overlapped();
57: };
58:
59:
60: class DEMUX_EXTERN transmitfile_control_block : public iocp_wakeup {
61: SOCKET s;
62: HANDLE file;
63: DWORD flags;
64: public:
65:
66: transmitfile_control_block(SOCKET dst)
67: : s(dst), file(NULL), flags(TF_DISCONNECT | TF_REUSE_SOCKET) {}
68:
69: transmitfile_control_block(SOCKET dst, HANDLE src)
70: : s(dst), file(src), flags(0) {}
71:
72: virtual bool start_overlapped();
73: };
74:
75:
76:
77: class DEMUX_EXTERN wsasocketio_control_block : public iocp_wakeup {
78: protected:
79: enum { NUM_WBUFS = 1 };
80: WSABUF wbufs[NUM_WBUFS];
81: public:
82: SOCKET s;
83: sel_param* ppb;
84: int error;
85: bool reading;
86:
87:
88: wsasocketio_control_block(SOCKET src, sel_param* pb, bool read);
89:
90: virtual bool start_overlapped();
91:
92: virtual void iocp_op_finished( DWORD nbytes, ULONG_PTR udat,
93: LPOVERLAPPED olp, int err);
94: };
95:
96:
97: class DEMUX_EXTERN winfileio_control_block : public iocp_wakeup {
98: bool reading;
99: public:
100:
101: sel_param pb;
102: HANDLE file;
103:
104:
105: winfileio_control_block(HANDLE f, void* buf, int len, bool read);
106:
107: virtual bool start_overlapped();
108:
109:
110: };
111:
112: }}
113:
114:
Start cpp section to demux/demux_kqueue_demuxer.cpp[1
/1
]
1: #line 2802 "./lpsrc/flx_demux.pak"
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15:
16:
17:
18:
19: namespace flx { namespace demux {
20: kqueue_demuxer::kqueue_demuxer()
21: : kq(-1)
22: {
23:
24:
25: kq = kqueue();
26: if(-1 == kq)
27: {
28: perror("kqueue");
29: throw -1;
30: }
31: }
32:
33: kqueue_demuxer::~kqueue_demuxer()
34: {
35:
36:
37:
38:
39:
40:
41:
42:
43: async_quit();
44:
45:
46:
47: if(-1 != kq && close(kq) == -1)
48: perror("kqueue close");
49: }
50:
51:
52:
53:
54:
55:
56:
57:
58: int
59: kqueue_demuxer::add_socket_wakeup(socket_wakeup* sv, int flags)
60: {
61:
62: if((flags & ~(PDEMUX_READ | PDEMUX_WRITE))) return -1;
63:
64:
65:
66:
67:
68: if(flags & PDEMUX_READ)
69: {
70: if(add_kqueue_filter(sv, EVFILT_READ) == -1) return -1;
71: }
72:
73: if(flags & PDEMUX_WRITE)
74: {
75: if(add_kqueue_filter(sv, EVFILT_WRITE) == -1) return -1;
76: }
77:
78: return 0;
79: }
80:
81: int
82: kqueue_demuxer::add_kqueue_filter(socket_wakeup* sv, short filter)
83: {
84: int s = sv->s;
85: struct kevent evt;
86:
87:
88:
89:
90:
91:
92:
93:
94:
95:
96:
97:
98:
99: EV_SET(&evt, s, filter, EV_ADD | EV_ONESHOT, 0, 0, sv);
100:
101:
102:
103:
104: if(kevent(kq, &evt, 1, NULL, 0, NULL) < 0)
105: {
106: perror("kevent add_kqueue_filter");
107: return -1;
108: }
109: return 0;
110: }
111:
112:
113:
114:
115:
116:
117:
118:
119:
120:
121: int
122: kqueue_demuxer::remove_kqueue_filter(int s, short filter)
123: {
124: struct kevent evt;
125:
126: EV_SET(&evt, s, filter, EV_DELETE, 0, 0, NULL);
127: if(kevent(kq, &evt, 1, NULL, 0, NULL) < 0)
128: {
129: perror("kevent remove_socket_wakeup");
130: return -1;
131: }
132:
133: return 0;
134: }
135:
136: int
137: kqueue_demuxer::remove_socket_wakeup(int s, int flags)
138: {
139: int r1 = 0, r2 = 0;
140:
141: if(flags & PDEMUX_READ) r1 = remove_kqueue_filter(s, EVFILT_READ);
142: if(flags & PDEMUX_WRITE) r2 = remove_kqueue_filter(s, EVFILT_WRITE);
143:
144:
145: if(r1 || r2) return -1;
146:
147: return 0;
148: }
149:
150:
151:
152:
153:
154:
155:
156:
157: /*
158: fprintf(stderr,"readevt on %i, EOF = %s\n",
159: s, (ev.flags & EV_EOF) ? "TRUE" : "FALSE");
160: */
161:
162:
163:
164: void
165: kqueue_demuxer::get_evts(bool poll)
166: {
167:
168: struct kevent ev;
169: int nevts;
170:
171: struct timespec timeout, *tptr = NULL;
172:
173: if(poll)
174: {
175: timeout.tv_sec = 0;
176: timeout.tv_nsec = 0;
177: tptr = &timeout;
178: }
179:
180:
181:
182:
183: nevts = kevent(kq, NULL, 0, &ev, 1, tptr);
184:
185: if(nevts <= 0)
186: {
187:
188: if(nevts < 0)
189: perror("kevent event fetch");
190:
191: return;
192: }
193:
194:
195:
196: socket_wakeup* sv = (socket_wakeup*)ev.udata;
197:
198:
199:
200:
201: if(ev.filter == EVFILT_READ)
202: {
203:
204:
205: /*
206:
207: if(NULL == sv)
208: {
209: int backlog = (int)ev.data;
210:
211: for(int i = 0; i < backlog; i++) handle_connection();
212: }
213: else
214: */
215:
216: if(0 && ev.flags & EV_EOF)
217: {
218:
219: fprintf(stderr,
220: "got EV_EOF on read, %i bytes remain in buffer, errno=%i\n",
221: (int)ev.data, ev.fflags);
222: }
223:
224:
225:
226: sv->wakeup_flags = PDEMUX_READ;
227: sv->wakeup(*this);
228: }
229: else if(ev.filter == EVFILT_WRITE)
230: {
231:
232:
233:
234:
235:
236:
237: if(ev.flags & EV_EOF)
238: {
239:
240:
241: fprintf(stderr,
242: "got EV_EOF on write, data bytes =%i (0?), errno/fflags?=%i\n",
243: (int)ev.data, ev.fflags);
244: }
245:
246: sv->wakeup_flags = PDEMUX_WRITE;
247: sv->wakeup(*this);
248: }
249: else
250: {
251: fprintf(stderr,"unsolicited event from kqueue (%i)...\n", ev.filter);
252:
253: }
254: }
255: }}
256:
Start cpp section to demux/demux_pfileio.cpp[1
/1
]
1: #line 3059 "./lpsrc/flx_demux.pak"
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15:
16:
17:
18:
19: namespace flx { namespace demux {
20:
21:
22:
23:
24:
25:
26: fileio_request::~fileio_request(){}
27: fileio_request::fileio_request(){}
28:
29: fileio_request::fileio_request(int f, char* buf, long len, long off, bool rd)
30: : offset(off), fd(f), read_flag(rd), err(0)
31: {
32: pb.buffer = buf;
33: pb.buffer_size = len;
34: pb.bytes_written = 0;
35: }
36:
37:
38: void
39: fileio_request::doit()
40: {
41:
42:
43:
44:
45:
46: ssize_t res;
47:
48: if(read_flag)
49: {
50:
51: res = read(fd, pb.buffer, pb.buffer_size);
52: }
53: else
54: {
55:
56: res = write(fd, pb.buffer, pb.buffer_size);
57: }
58:
59:
60: if(-1 == res)
61: {
62: err = errno;
63: fprintf(stderr,"faio error: %i\n", err);
64: }
65: else
66: {
67:
68: pb.bytes_written = res;
69: }
70: }
71: }}
72:
Start cpp section to demux/demux_posix_demuxer.cpp[1
/1
]
1: #line 3132 "./lpsrc/flx_demux.pak"
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15:
16:
17: namespace flx { namespace demux {
18:
19: posix_demuxer::~posix_demuxer()
20: {
21: }
22:
23: bool
24: posix_demuxer::socket_recv(int s, sel_param* pb)
25: {
26:
27: assert(pb->buffer_size > pb->bytes_written || 0 == pb->buffer_size);
28: ssize_t nbytes;
29:
30:
31: nbytes = recv(s, pb->buffer + pb->bytes_written,
32: pb->buffer_size - pb->bytes_written, 0);
33:
34: if(nbytes <= 0)
35: {
36: if(nbytes == 0)
37: {
38: return true;
39: }
40: else
41: {
42: perror("recv");
43: return true;
44: }
45: }
46: else
47: {
48:
49: pb->bytes_written += nbytes;
50: }
51: return false;
52: }
53:
54: bool
55: posix_demuxer::socket_send(int s, sel_param* pb)
56: {
57:
58:
59:
60:
61: assert(pb->buffer_size > pb->bytes_written || 0 == pb->buffer_size);
62:
63: ssize_t nbytes;
64:
65: nbytes = send(s, pb->buffer + pb->bytes_written,
66: pb->buffer_size - pb->bytes_written, 0);
67:
68:
69:
70:
71:
72: if(-1 == nbytes)
73: {
74: perror("send");
75: return true;
76: }
77: else
78: {
79:
80: pb->bytes_written += nbytes;
81: }
82: return false;
83: }
84:
85:
86: void
87: posix_demuxer::async_quit()
88: {
89: try {
90:
91:
92:
93:
94:
95:
96:
97:
98:
99: demux_quitter quitter;
100: quitter.quit(this);
101: } catch(...) {
102: fprintf(stderr, "error waking demuxer with self pipe quitter\n");
103: }
104: }
105:
106: #if 0
107:
108:
109:
110:
111:
112:
113:
114:
115: nbytes = read(s, pb->buffer + pb->bytes_written,
116: pb->buffer_size - pb->bytes_written);
117: #endif
118:
119:
120:
121: int
122: accept_control_block::start(posix_demuxer& demux)
123: {
124:
125:
126:
127:
128:
129: accepted = -1;
130:
131:
132:
133: socket_err = EINPROGRESS;
134: return demux.add_socket_wakeup(this, PDEMUX_READ);
135: }
136:
137:
138: void
139: accept_control_block::wakeup(posix_demuxer& demux)
140: {
141:
142:
143:
144:
145: accepted = nice_accept(s, &socket_err);
146:
147: if(accepted == -1)
148: {
149: fprintf(stderr, "nice_accept failed, err (%i)\n", socket_err);
150: }
151: }
152:
153:
154:
155: int
156: connect_control_block::start(posix_demuxer& demux)
157: {
158:
159:
160: int finished;
161:
162:
163:
164: s = async_connect(addy, p, &finished, &socket_err);
165:
166:
167:
168:
169: if(-1 == s)
170: {
171: fprintf(stderr,"async_connect failed (%i)\n", socket_err);
172: return -1;
173: }
174:
175: if(finished)
176: {
177:
178: fprintf(stderr,"async_connect finished immediately, waking\n");
179: fprintf(stderr, "No wakeup coming...\n");
180:
181:
182:
183: return -1;
184: }
185:
186:
187:
188:
189:
190: return demux.add_socket_wakeup(this, PDEMUX_WRITE);
191: }
192:
193: void
194: connect_control_block::wakeup(posix_demuxer& demux)
195: {
196:
197:
198:
199: if(get_socket_error(s, &socket_err) == -1)
200: fprintf(stderr, "eep - get_socket_err failed!\n");
201:
202:
203: if(0 != socket_err)
204: {
205: fprintf(stderr,"async connect error: %s (%i), closing\n",
206: strerror(socket_err), socket_err);
207:
208: if(close(s) != 0)
209: perror("async socket close");
210:
211: s = -1;
212: }
213:
214:
215: }
216: }}
217:
Start cpp section to demux/demux_select_demuxer.cpp[1
/1
]
1: #line 3350 "./lpsrc/flx_demux.pak"
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15:
16:
17:
18:
19:
20:
21:
22:
23:
24:
25:
26:
27:
28: namespace flx { namespace demux {
29:
30: select_demuxer::select_demuxer()
31: {
32:
33:
34: FD_ZERO(&master_read_set);
35: FD_ZERO(&master_write_set);
36: FD_ZERO(&master_except_set);
37: fdmax = 0;
38:
39:
40:
41:
42: std::uninitialized_fill_n(svs,FD_SETSIZE,(socket_wakeup*)0);
43: }
44:
45:
46:
47: void
48: select_demuxer::get_evts(bool poll)
49: {
50:
51:
52:
53: fd_set read_set, write_set, except_set;
54:
55: copy_sets(read_set, write_set, except_set);
56:
57: if(select(read_set, write_set, except_set, poll))
58: process_sets(read_set, write_set, except_set);
59: }
60:
61: int
62: select_demuxer::add_socket_wakeup(socket_wakeup* sv, int flags)
63: {
64: int s = sv->s;
65:
66:
67:
68: if(s < 0 || s >= FD_SETSIZE) return -1;
69:
70: assert(svs[s] == NULL);
71:
72: if(flags & PDEMUX_READ) FD_SET(s, &master_read_set);
73:
74: if(flags & PDEMUX_WRITE) FD_SET(s, &master_write_set);
75:
76:
77:
78: FD_SET(s, &master_except_set);
79:
80: svs[s] = sv;
81:
82:
83: if(s > fdmax) fdmax = s;
84:
85: return 0;
86: }
87:
88:
89: void
90: select_demuxer::remove_fd(int s)
91: {
92:
93:
94: assert(s >= 0 && s < FD_SETSIZE);
95: assert(svs[s] != NULL);
96:
97:
98: FD_CLR(s, &master_read_set);
99: FD_CLR(s, &master_write_set);
100: FD_CLR(s, &master_except_set);
101:
102: svs[s] = NULL;
103: }
104:
105:
106: void
107: select_demuxer::copy_sets(fd_set& rset, fd_set& wset, fd_set& exset)
108: {
109: rset = master_read_set;
110: wset = master_write_set;
111: exset = master_except_set;
112: }
113:
114: bool
115: select_demuxer::select(fd_set& rset, fd_set& wset, fd_set& exset, bool poll)
116: {
117:
118:
119: struct timeval tv, *tp = NULL;
120:
121: if(poll)
122: {
123: tv.tv_sec = 0;
124: tv.tv_usec = 0;
125: tp = &tv;
126: }
127:
128:
129:
130:
131:
132:
133: switch(::select(fdmax+1, &rset, &wset, &exset, tp))
134: {
135: case 0:
136: return false;
137: break;
138: case -1:
139:
140:
141:
142:
143:
144:
145:
146:
147: perror("select");
148:
149: break;
150: }
151: return true;
152: }
153:
154: void
155: select_demuxer::process_sets(fd_set& rset, fd_set& wset, fd_set& exset)
156: {
157:
158:
159:
160:
161:
162:
163:
164:
165:
166:
167:
168: int new_fdmax = 0;
169:
170: for(int i = 0; i <= fdmax; i++)
171: {
172: int flags = 0;
173:
174: if(FD_ISSET(i, &rset)) flags |= PDEMUX_READ;
175:
176: if(FD_ISSET(i, &wset)) flags |= PDEMUX_WRITE;
177:
178:
179:
180: if(FD_ISSET(i, &exset))
181: {
182:
183:
184:
185:
186:
187:
188:
189:
190:
191:
192:
193: fprintf(stderr, "select error on socket %i, flags=%x\n",
194: i, flags);
195:
196: int err;
197:
198:
199:
200: if(get_socket_error(i, &err) == -1)
201: fprintf(stderr, "get_socket_error failed!?!\n");
202:
203: fprintf(stderr, "socket err = %i, %s\n", err, strerror(err));
204:
205:
206: }
207:
208:
209: if(flags)
210: {
211: socket_wakeup* sv = svs[i];
212:
213:
214: remove_fd(i);
215:
216: sv->wakeup_flags = flags;
217: sv->wakeup(*this);
218: }
219:
220:
221: if(svs[i]) new_fdmax = i;
222: }
223:
224:
225:
226: fdmax = new_fdmax;
227: }
228:
229: }}
230:
Start cpp section to demux/demux_ts_select_demuxer.cpp[1
/1
]
1: #line 3581 "./lpsrc/flx_demux.pak"
2:
3:
4:
5:
6: namespace flx { namespace demux {
7:
8: ts_select_demuxer::ts_select_demuxer()
9: {
10:
11:
12: sp.install(&demux);
13: }
14:
15: ts_select_demuxer::~ts_select_demuxer()
16: {
17: async_quit();
18: }
19:
20: void
21: ts_select_demuxer::get_evts(bool poll)
22: {
23: fd_set rset, wset, exset;
24:
25:
26: {
27: flx::pthread::flx_mutex_locker_t locker(ham_fist);
28: demux.copy_sets(rset, wset, exset);
29: }
30:
31:
32:
33:
34:
35:
36:
37: if(demux.select(rset, wset, exset, poll))
38: {
39: flx::pthread::flx_mutex_locker_t locker(ham_fist);
40: demux.process_sets(rset, wset, exset);
41: }
42: }
43:
44:
45:
46: int
47: ts_select_demuxer::add_socket_wakeup(socket_wakeup* sv, int flags)
48: {
49:
50:
51: flx::pthread::flx_mutex_locker_t locker(ham_fist);
52:
53: int res = demux.add_socket_wakeup(sv, flags);
54:
55:
56:
57:
58: if(-1 != res) sp.wake();
59:
60: return res;
61:
62:
63:
64:
65: }
66:
67: }}
68:
Start cpp section to demux/demux_sockety.hpp[1
/1
]
1: #line 3650 "./lpsrc/flx_demux.pak"
2:
3:
4:
5: namespace flx { namespace demux {
6:
7:
8:
9: int create_listener_socket(int* io_port, int q_len);
10: int create_async_listener(int* io_port, int q_len);
11: int nice_accept(int listener, int* err);
12: int nice_connect(const char* addr, int port);
13: int async_connect(const char* addr, int port, int* finished, int* err);
14:
15: /* handy socket building blocks */
16:
17: int connect_sock(int s, const char* addr, int port);
18:
19: /* this could possibly do with NIC addr as well as port */
20: int bind_sock(int s, int* io_port);
21:
22: int make_nonblock(int s);
23: int set_tcp_nodelay(int s, int disable_nagle);
24: int get_socket_error(int s, int* socket_err);
25:
26: }}
27:
28:
Start cpp section to demux/demux_quitter.hpp[1
/1
]
1: #line 3679 "./lpsrc/flx_demux.pak"
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15:
16:
17: namespace flx { namespace demux {
18:
19:
20:
21: class DEMUX_EXTERN async_bool : public demux_quit_flag {
22: flx::pthread::flx_mutex_t cv_lock;
23: flx::pthread::flx_condv_t finished_cond;
24: bool finished;
25: public:
26: async_bool();
27:
28: void wait_until_true();
29: virtual void signal_true();
30: };
31:
32:
33: class DEMUX_EXTERN demux_quitter : public demux_callback {
34:
35: #ifdef _WIN32
36: wself_piper sp;
37: #else
38: self_piper sp;
39: #endif
40: async_bool finished;
41: void callback(demuxer* demux);
42: public:
43: void quit(demuxer* demux);
44: };
45:
46: /*
47: class DEMUX_EXTERN wdemux_quitter : demux_callback {
48: wself_piper sp;
49: async_bool finished;
50: public:
51: void quit(iocp_demuxer* demux);
52:
53: void callback(demuxer* d);
54: };
55: */
56:
57: } }
58:
59:
60:
Start cpp section to demux/demux_quitter.cpp[1
/1
]
1: #line 3740 "./lpsrc/flx_demux.pak"
2:
3:
4:
5: namespace flx { namespace demux {
6:
7: async_bool::async_bool()
8: : finished(false)
9: {
10:
11: }
12:
13:
14: void
15: async_bool::wait_until_true()
16: {
17: flx::pthread::flx_mutex_locker_t locker(cv_lock);
18:
19:
20: while(!finished)
21: {
22: finished_cond.wait(&cv_lock);
23: }
24: }
25:
26:
27: void
28: async_bool::signal_true()
29: {
30: finished = true;
31: finished_cond.signal();
32:
33: }
34:
35: void
36: demux_quitter::callback(demuxer* demux)
37: {
38:
39: demux->set_quit_flag(&finished);
40: }
41:
42: void
43: demux_quitter::quit(demuxer* demux)
44: {
45:
46:
47: sp.install(demux, this);
48:
49: sp.wake();
50:
51: finished.wait_until_true();
52:
53: }
54:
55: /*
56:
57: void
58: wdemux_quitter::quit(iocp_demuxer* demux)
59: {
60: fprintf(stderr, "wdemux_quitter::quit\n");
61:
62:
63: sp.install(demux, this);
64: sp.wake();
65: fprintf(stderr, "wdemux_quitter::quit waiting on finished flag\n");
66: finished.wait_until_true();
67: fprintf(stderr, "wdemux_quitter::quit exiting\n");
68: }
69:
70: void
71: wdemux_quitter::callback(demuxer* d)
72: {
73: fprintf(stderr, "wdemux_quitter got called back by demuxer (%p)!\n", d);
74:
75: d->set_quit_flag(&finished);
76: }
77: */
78:
79: } }
80:
Start cpp section to demux/demux_self_piper.hpp[1
/1
]
1: #line 3821 "./lpsrc/flx_demux.pak"
2:
3:
4:
5:
6:
7:
8:
9: namespace flx { namespace demux {
10:
11:
12:
13:
14: class DEMUX_EXTERN selfpipe_wakeup : public socket_wakeup {
15: public:
16: demux_callback* cb;
17:
18: virtual void wakeup(posix_demuxer& demux);
19: };
20:
21: class DEMUX_EXTERN auto_fd {
22: public:
23: int fd;
24:
25: auto_fd();
26: ~auto_fd();
27: };
28:
29:
30: class DEMUX_EXTERN pipe_pair {
31:
32: auto_fd fds[2];
33: public:
34: pipe_pair();
35:
36: void write_byte();
37: int get_read_end();
38: };
39:
40:
41:
42: class DEMUX_EXTERN self_piper {
43: pipe_pair pp;
44: selfpipe_wakeup spw;
45: public:
46: void install(demuxer* demux, demux_callback* cb = 0);
47: void wake();
48: };
49:
50: }}
51:
52:
Start cpp section to demux/demux_self_piper.cpp[1
/1
]
1: #line 3874 "./lpsrc/flx_demux.pak"
2:
3:
4:
5:
6:
7:
8: namespace flx { namespace demux {
9:
10: auto_fd::auto_fd()
11: {
12: fd = -1;
13: }
14:
15: auto_fd::~auto_fd()
16: {
17: if(-1 == fd) return;
18:
19: if(close(fd) == -1)
20: perror("auto fd close");
21: }
22:
23: void
24: self_piper::install(demuxer* d, demux_callback* cb)
25: {
26:
27: posix_demuxer* demux = static_cast<posix_demuxer*>(d);
28: spw.s = pp.get_read_end();
29: spw.cb = cb;
30:
31: int res = demux->add_socket_wakeup(&spw, PDEMUX_READ);
32: assert(-1 != res);
33: }
34:
35:
36: void
37: self_piper::wake()
38: {
39:
40: pp.write_byte();
41: }
42:
43:
44: /*
45: void
46: self_piper::inspect(int s)
47: {
48:
49:
50:
51: if(s != fds[0].fd) wake();
52: }
53: */
54:
55: void
56: selfpipe_wakeup::wakeup(posix_demuxer& demux)
57: {
58:
59:
60:
61: ssize_t nbytes;
62: char b;
63:
64:
65:
66: nbytes = read(s, &b, 1);
67:
68: if(nbytes == -1) perror("read");
69:
70:
71: assert(nbytes == 1 && b == 1);
72:
73:
74: if(cb) cb->callback(&demux);
75:
76:
77:
78:
79: int res = demux.add_socket_wakeup(this, PDEMUX_READ);
80: assert(-1 != res);
81: }
82:
83: pipe_pair::pipe_pair()
84: {
85:
86:
87: int self_pipe_fds[2];
88: if(pipe(self_pipe_fds) == -1)
89: {
90: perror("ts_select_demuxer::self_pipe");
91: throw -1;
92: }
93:
94:
95:
96:
97: fds[0].fd = self_pipe_fds[0];
98: fds[1].fd = self_pipe_fds[1];
99: }
100:
101:
102: /*
103: void
104: pipe_pair::read_byte()
105: {
106: ssize_t nbytes;
107: char b;
108:
109:
110:
111: nbytes = read(fds[0].fd, &b, 1);
112:
113: if(nbytes == -1) perror("read");
114:
115:
116: assert(nbytes == 1 && b == 1);
117: }
118: */
119:
120: void
121: pipe_pair::write_byte()
122: {
123: char b = 1;
124: ssize_t nbytes;
125:
126: nbytes = write(fds[1].fd, &b, 1);
127:
128:
129:
130: if(-1 == nbytes) perror("pipe_pair::write_byte");
131: assert(1 == nbytes);
132: }
133:
134: int
135: pipe_pair::get_read_end()
136: {
137: return fds[0].fd;
138: }
139:
140: } }
141:
Start cpp section to demux/demux_wself_piper.hpp[1
/1
]
1: #line 4016 "./lpsrc/flx_demux.pak"
2:
3:
4:
5:
6:
7:
8:
9: namespace flx { namespace demux {
10:
11: class DEMUX_EXTERN auto_handle {
12: public:
13: HANDLE h;
14:
15: auto_handle();
16: ~auto_handle();
17: };
18:
19:
20: class DEMUX_EXTERN wpipe_pair {
21: enum { READ_END, WRITE_END };
22:
23: auto_handle pipe[2];
24: public:
25: wpipe_pair();
26:
27: void write_byte();
28: HANDLE get_read_end() { return pipe[READ_END].h; }
29: };
30:
31:
32:
33:
34:
35:
36: class DEMUX_EXTERN wself_piper_wakeup : public winfileio_control_block
37: {
38: char the_byte;
39:
40: public:
41:
42: demux_callback* cb;
43:
44:
45: iocp_demuxer* d;
46:
47:
48: wself_piper_wakeup();
49:
50:
51:
52: virtual void iocp_op_finished(DWORD nbytes, ULONG_PTR udat,
53: LPOVERLAPPED olp, int err);
54:
55: void arm();
56: };
57:
58:
59:
60: class DEMUX_EXTERN wself_piper {
61: wpipe_pair pp;
62: wself_piper_wakeup spw;
63: public:
64: void install(demuxer* demux, demux_callback* cb = 0);
65: void wake();
66: };
67:
68: } }
69:
70:
71:
Start cpp section to demux/demux_wself_piper.cpp[1
/1
]
1: #line 4088 "./lpsrc/flx_demux.pak"
2:
3:
4:
5:
6: namespace flx { namespace demux {
7:
8: auto_handle::auto_handle()
9: {
10: h = INVALID_HANDLE_VALUE;
11: }
12:
13: auto_handle::~auto_handle()
14: {
15: if(INVALID_HANDLE_VALUE == h) return;
16:
17: if(!CloseHandle(h))
18: fprintf(stderr, "auto CloseHandle failed: %i\n", GetLastError());
19: }
20:
21: wpipe_pair::wpipe_pair()
22: {
23:
24:
25:
26:
27:
28:
29: const char* pname = "\\\\.\\pipe\\flx_iocp_quitter";
30:
31:
32:
33:
34:
35:
36: pipe[READ_END].h = CreateNamedPipe(pname,
37: PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED,
38: PIPE_TYPE_BYTE, 1, 256, 256, 0, NULL);
39:
40: if(INVALID_HANDLE_VALUE == pipe[READ_END].h)
41: {
42: fprintf(stderr, "couldn't create named pipe: %i\n", GetLastError());
43: throw -1;
44: }
45:
46:
47:
48:
49:
50:
51: pipe[WRITE_END].h = CreateFile(pname, FILE_READ_DATA | FILE_WRITE_DATA,
52: FILE_SHARE_READ | FILE_SHARE_WRITE, NULL, OPEN_EXISTING,
53: FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED, NULL);
54:
55: if(INVALID_HANDLE_VALUE == pipe[WRITE_END].h)
56: {
57: fprintf(stderr, "failed to open named pipe: %i\n", GetLastError());
58: throw -1;
59: }
60:
61:
62:
63: /*
64: if(!CreatePipe(&pipe[READ_END], &pipe[WRITE_END], NULL, 1))
65: {
66: fprintf(stderr, "wpipe_pair CreatePipe failed: %i\n", GetLastError());
67: throw -1;
68: }
69: */
70: }
71:
72: void
73: wpipe_pair::write_byte()
74: {
75:
76: char b = 1;
77: DWORD bytes_written;
78:
79: if(!WriteFile(pipe[WRITE_END].h, &b, 1, &bytes_written, NULL))
80: fprintf(stderr, "wpipe_pair failed to write byte: %i\n",
81: GetLastError());
82: }
83:
84: void
85: wself_piper::install(demuxer* d, demux_callback* cb)
86: {
87: fprintf(stderr, "wself_piper::install(%p, %p)\n", d, cb);
88: iocp_demuxer* demux = static_cast<iocp_demuxer*>(d);
89:
90:
91: HANDLE read_end = pp.get_read_end();
92:
93: #if 0
94:
95:
96: DWORD pipe_mode = PIPE_NOWAIT;
97: if(!SetNamedPipeHandleState(read_end, &pipe_mode, NULL, NULL))
98: {
99: fprintf(stderr, "SetNamedPipeHandleState failed: %i\n", GetLastError());
100: return;
101: }
102: #endif
103:
104: if(0 != demux->associate_with_iocp(read_end, NULL))
105: {
106: fprintf(stderr, "failed to install self pipe in IOCP!!!\n");
107: return;
108: }
109:
110:
111: spw.d = demux;
112: spw.cb = cb;
113: spw.file = read_end;
114:
115: fprintf(stderr, "initial self pipe arm\n");
116: spw.arm();
117: }
118:
119:
120: void
121: wself_piper::wake()
122: {
123: fprintf(stderr, "wself_piper::wake - write a byte\n");
124: pp.write_byte();
125: }
126:
127: wself_piper_wakeup::wself_piper_wakeup()
128:
129:
130: : winfileio_control_block(INVALID_HANDLE_VALUE, NULL, 0, true),
131: cb(0), d(0)
132: {
133:
134: fprintf(stderr, "SET UP THE PIPE HANDLE!\n");
135: }
136:
137:
138:
139: void
140: wself_piper_wakeup::iocp_op_finished(DWORD nbytes, ULONG_PTR udat,
141: LPOVERLAPPED olp, int err)
142: {
143: fprintf(stderr, "wself_piper_wakeup::iocp_op_finished\n");
144: fprintf(stderr, "nbytes=%i, err=%i\n", nbytes, err);
145: fprintf(stderr, "about to callback %p(%p)\n", cb, d);
146:
147: if(cb) cb->callback(d);
148:
149: arm();
150: }
151:
152: void
153: wself_piper_wakeup::arm()
154: {
155:
156: fprintf(stderr, "wself_piper_wakeup::arm\n");
157: pb.buffer = &the_byte;
158: pb.buffer_size = 1;
159: pb.bytes_written = 0;
160: if(start_overlapped())
161: fprintf(stderr, "WARNING: wslef_pipe install completed immediately\n");
162: }
163:
164: } }
165:
Start cpp section to demux/demux_poll_demuxer.hpp[1
/1
]
1: #line 4254 "./lpsrc/flx_demux.pak"
2:
3:
4:
5:
6:
7:
8:
9:
10: namespace flx { namespace demux {
11:
12: class DEMUX_EXTERN poll_demuxer : public posix_demuxer {
13: void* fd_array;
14: void* sv_array;
15:
16: virtual void get_evts(bool poll);
17: public:
18: poll_demuxer();
19: virtual ~poll_demuxer();
20:
21: virtual int add_socket_wakeup(socket_wakeup* sv, int flags);
22:
23: void get_arrays(void** fds, void** svs);
24: int dopoll(void* infds, bool poll_flag);
25: void process_evts(void* infds, void* svs, int nevts);
26: };
27:
28: } }
29:
30:
Start cpp section to demux/demux_poll_demuxer.cpp[1
/1
]
1: #line 4285 "./lpsrc/flx_demux.pak"
2:
3:
4:
5:
6:
7:
8:
9: namespace flx { namespace demux {
10:
11: using namespace std;
12:
13: typedef vector<socket_wakeup*> sockvec;
14: typedef vector<struct pollfd> fdvec;
15:
16: #define FDS ((fdvec*)fd_array)
17: #define SOCS ((sockvec*)sv_array)
18:
19:
20:
21:
22:
23:
24:
25: poll_demuxer::poll_demuxer()
26: : fd_array(0), sv_array(0)
27: {
28:
29: }
30:
31: poll_demuxer::~poll_demuxer()
32: {
33:
34: if(SOCS) delete SOCS;
35: if(FDS) delete FDS;
36: }
37:
38:
39: void
40: poll_demuxer::get_arrays(void** fds, void** svs)
41: {
42: *fds = fd_array;
43: *svs = sv_array;
44:
45: fd_array = 0;
46: sv_array = 0;
47: }
48:
49:
50: int
51: poll_demuxer::dopoll(void* infds, bool poll_flag)
52: {
53: fdvec* fds_copy = (fdvec*)infds;
54:
55: if(!fds_copy)
56: {
57: if(!poll_flag) fprintf(stderr, "Warning ::poll(\\inf) on zero fds!\n");
58: return 0;
59: }
60:
61: struct pollfd* fds = &(*fds_copy)[0];
62:
63:
64: unsigned long nfds = (*fds_copy).size();
65:
66: int nevts;
67:
68:
69:
70: nevts = ::poll(fds, nfds, (poll_flag) ? 0 : -1);
71:
72: if(-1 == nevts)
73: {
74: perror("poll_demuxer::get_evts");
75: return 0;
76: }
77:
78: return nevts;
79: }
80:
81:
82:
83: void
84: poll_demuxer::process_evts(void* infds, void* svs, int nevts)
85: {
86:
87:
88:
89:
90:
91:
92: if(0 == nevts && !fd_array)
93: {
94:
95: assert( !sv_array );
96: fd_array = infds;
97: sv_array = svs;
98: return;
99: }
100:
101: fdvec* fds_copy = (fdvec*)infds;
102: sockvec* socs_copy = (sockvec*)svs;
103:
104: struct pollfd* fds = &(*fds_copy)[0];
105: unsigned long nfds = (*fds_copy).size();
106:
107:
108:
109: int evts_encountered = 0;
110:
111:
112:
113: for(unsigned long i = 0; i < nfds; i++, fds++)
114: {
115:
116:
117: socket_wakeup* sv = (*socs_copy)[i];
118:
119:
120:
121: int wakeup_flags = 0;
122:
123: bool wake = false;
124:
125:
126:
127: if(fds->revents & POLLIN)
128: {
129:
130: wakeup_flags |= PDEMUX_READ;
131: wake = true;
132: evts_encountered++;
133: }
134:
135: if(fds->revents & POLLOUT)
136: {
137:
138: wakeup_flags |= PDEMUX_WRITE;
139: wake = true;
140: evts_encountered++;
141: }
142:
143:
144: if(fds->revents & POLLERR)
145: {
146: fprintf(stderr, "POLLERR for %p->%i\n", sv, sv->s);
147: wake = true;
148: }
149:
150:
151:
152:
153:
154:
155:
156:
157: if(fds->revents & POLLHUP)
158: {
159: fprintf(stderr, "POLLHUP for %p->%i\n", sv, sv->s);
160: assert((fds->revents & POLLOUT) == 0);
161: wake = true;
162: }
163:
164:
165: if(fds->revents & POLLNVAL)
166: {
167: fprintf(stderr, "POLLNVAL for %p->%i\n", sv, sv->s);
168: wake = true;
169: }
170:
171: if(wake)
172: {
173:
174:
175: sv->wakeup_flags = wakeup_flags;
176: sv->wakeup(*this);
177: }
178: else
179: {
180:
181:
182:
183:
184:
185: if(add_socket_wakeup(sv, sv->wakeup_flags) == -1)
186: fprintf(stderr, "poll re-add finished immediately!?!\n");
187: }
188: }
189:
190:
191: if(evts_encountered != nevts)
192: {
193: fprintf(stderr, "poll seen/nevts mismatch: %i/%i\n",
194: evts_encountered, nevts);
195: }
196:
197:
198: delete fds_copy;
199: delete socs_copy;
200: }
201:
202:
203:
204: void
205: poll_demuxer::get_evts(bool poll_flag)
206: {
207:
208: void *fds, *svs;
209: int nevts;
210:
211: get_arrays(&fds, &svs);
212:
213:
214: nevts = this->dopoll(fds, poll_flag);
215:
216:
217:
218:
219: process_evts(fds, svs, nevts);
220: }
221:
222:
223: int
224: poll_demuxer::add_socket_wakeup(socket_wakeup* sv, int flags)
225: {
226:
227:
228:
229: if(!FDS)
230: {
231:
232: assert(SOCS == NULL);
233:
234: fd_array = new fdvec;
235: sv_array = new sockvec;
236: }
237:
238:
239: struct pollfd fd;
240:
241:
242:
243: sv->wakeup_flags = flags;
244:
245: fd.fd = sv->s;
246: fd.events = 0;
247:
248:
249:
250: fd.revents = 0;
251:
252: if(flags & PDEMUX_READ) fd.events |= POLLIN;
253: if(flags & PDEMUX_WRITE) fd.events |= POLLOUT;
254:
255:
256:
257:
258:
259:
260:
261:
262: assert(0 != fd.events);
263:
264:
265: FDS->push_back(fd);
266: SOCS->push_back(sv);
267:
268: return 0;
269: }
270:
271: } }
272:
Start cpp section to demux/demux_ts_poll_demuxer.hpp[1
/1
]
1: #line 4558 "./lpsrc/flx_demux.pak"
2:
3:
4:
5:
6:
7:
8:
9:
10:
11: namespace flx { namespace demux {
12:
13: class ts_poll_demuxer : public posix_demuxer {
14:
15: flx::pthread::flx_mutex_t ham_fist;
16:
17: poll_demuxer demux;
18:
19: self_piper sp;
20: protected:
21: virtual void get_evts(bool poll);
22: public:
23: ts_poll_demuxer();
24: ~ts_poll_demuxer();
25:
26: virtual int add_socket_wakeup(socket_wakeup* sv, int flags);
27:
28:
29: virtual demux_quit_flag* get_quit_flag() { return demux.get_quit_flag(); }
30: virtual void set_quit_flag(demux_quit_flag* f) { demux.set_quit_flag(f); }
31: };
32:
33: }}
34:
35:
36:
Start cpp section to demux/demux_ts_poll_demuxer.cpp[1
/1
]
1: #line 4595 "./lpsrc/flx_demux.pak"
2:
3:
4:
5: namespace flx { namespace demux {
6:
7: ts_poll_demuxer::ts_poll_demuxer()
8: {
9: fprintf(stderr, "ts_poll_demuxer installing self-piper\n");
10:
11: sp.install(&demux);
12: }
13:
14: ts_poll_demuxer::~ts_poll_demuxer()
15: {
16: fprintf(stderr, "ts_polling asking thread to quit\n");
17: async_quit();
18: fprintf(stderr, "ts_poll async quit finished\n");
19: }
20:
21: void
22: ts_poll_demuxer::get_evts(bool poll)
23: {
24: void *fds, *svs;
25:
26:
27: {
28: flx::pthread::flx_mutex_locker_t locker(ham_fist);
29: demux.get_arrays(&fds, &svs);
30:
31: }
32:
33:
34: int nevts = demux.dopoll(fds, poll);
35:
36:
37:
38: {
39: flx::pthread::flx_mutex_locker_t locker(ham_fist);
40: demux.process_evts(fds, svs, nevts);
41:
42: }
43: }
44:
45:
46:
47: int
48: ts_poll_demuxer::add_socket_wakeup(socket_wakeup* sv, int flags)
49: {
50:
51: flx::pthread::flx_mutex_locker_t locker(ham_fist);
52:
53: int res = demux.add_socket_wakeup(sv, flags);
54:
55:
56: if(-1 != res) sp.wake();
57:
58: return res;
59: }
60: }}