5. Sockets
Start cpp section to faio/faio_posixio.hpp[1
/1
]
1: #line 870 "./lpsrc/flx_faio.pak"
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15:
16:
17:
18:
19:
20:
21: namespace flx { namespace faio {
22:
23: class FAIO_EXTERN socketio_wakeup : public demux::socket_wakeup {
24: public:
25: demux::sel_param pb;
26: thread_wakeup fw;
27:
28: int sio_flags;
29:
30: virtual void wakeup(demux::posix_demuxer& demux);
31: };
32:
33:
34:
35:
36:
37: class FAIO_EXTERN socketio_request : public flx_driver_request_base {
38: public:
39: socketio_wakeup sv;
40:
41: socketio_request() {}
42:
43: socketio_request(int s, char* buf, long len, bool r);
44: virtual bool start_async_op(demux::demuxer& demux, flx_drv* drv, void* f);
45: };
46:
47: class FAIO_EXTERN connect_request
48: : public flx_driver_request_base, public demux::connect_control_block {
49: public:
50: thread_wakeup fw;
51:
52: connect_request() {}
53:
54: connect_request(const char* addr, int port) { addy = addr; p = port; s=-1; }
55: virtual bool start_async_op(demux::demuxer& demux, flx_drv* drv, void* f);
56: virtual void wakeup(demux::posix_demuxer& demux);
57: };
58:
59: class FAIO_EXTERN accept_request
60: : public flx_driver_request_base, public demux::accept_control_block {
61: public:
62:
63:
64:
65: thread_wakeup fw;
66:
67: accept_request() {}
68:
69:
70: accept_request(int listener) { s = listener; }
71:
72:
73: virtual bool start_async_op(demux::demuxer& demux, flx_drv* drv, void* f);
74:
75:
76: virtual void wakeup(demux::posix_demuxer& demux);
77: };
78:
79:
80:
81:
82: class FAIO_EXTERN flxfileio_request
83: : public flx_driver_request_base, public demux::fileio_request
84: {
85: thread_wakeup fw;
86: public:
87: flxfileio_request();
88: ~flxfileio_request();
89:
90: flxfileio_request(int f, char* buf, long len, long off, bool rd)
91: : fileio_request(f, buf, len, off, rd) {}
92:
93:
94: virtual bool start_async_op(demux::demuxer& demux, flx_drv* drv, void* f);
95:
96:
97: virtual void finished();
98: };
99:
100: }}
101:
Start cpp section to faio/faio_wdrv.hpp[1
/1
]
1: #line 972 "./lpsrc/flx_faio.pak"
2:
3:
4:
5:
6:
7:
8:
9: namespace flx { namespace faio {
10:
11: class FAIO_EXTERN wflx_drv : public flx_drv {
12: demux::win_timer_queue sleepers;
13: public:
14: wflx_drv(flx::pthread::sleep_queue_t& q, int n1, int m1, int n2, int m2);
15:
16: demux::timer_queue* get_sleepers() { return &sleepers; }
17: };
18:
19: }}
20:
21:
22:
Start cpp section to faio/faio_wdrv.cpp[1
/1
]
1: #line 995 "./lpsrc/flx_faio.pak"
2:
3:
4:
5:
6: namespace flx { namespace faio {
7:
8:
9: wflx_drv::wflx_drv(flx::pthread::sleep_queue_t& q,int n1,int m1,int n2, int m2) : flx_drv(q,n1,m1) {}
10:
11: }}
12:
Start cpp section to faio/faio_posixio.cpp[1
/1
]
1: #line 1008 "./lpsrc/flx_faio.pak"
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15:
16:
17: using namespace flx::demux;
18: namespace flx { namespace faio {
19: socketio_request::socketio_request(int s, char* buf, long len, bool read)
20: {
21: sv.s = s;
22:
23: sv.sio_flags = ((read) ? PDEMUX_READ : PDEMUX_WRITE);
24:
25: sv.pb.buffer = buf;
26: sv.pb.buffer_size = len;
27: sv.pb.bytes_written = 0;
28: }
29:
30: bool
31: socketio_request::start_async_op(demux::demuxer& demux, flx_drv* drv, void* f)
32: {
33:
34:
35:
36:
37: RECORD_THREAD_INFO(sv.fw);
38:
39: posix_demuxer* pd = (posix_demuxer*)&demux;
40:
41:
42: return (pd->add_socket_wakeup(&sv, sv.sio_flags) == -1);
43: }
44:
45:
46: void
47: socketio_wakeup::wakeup(posix_demuxer& demux)
48: {
49:
50:
51: bool connection_closed;
52:
53:
54:
55:
56:
57:
58:
59:
60: if(wakeup_flags & PDEMUX_READ)
61: {
62:
63: assert(wakeup_flags == PDEMUX_READ);
64: connection_closed = posix_demuxer::socket_recv(s, &pb);
65: }
66: else
67: {
68:
69: assert(wakeup_flags == PDEMUX_WRITE);
70: connection_closed = posix_demuxer::socket_send(s, &pb);
71: }
72:
73:
74:
75:
76:
77:
78: if(connection_closed || pb.bytes_written == pb.buffer_size)
79: {
80:
81:
82: fw.wake();
83: return;
84: }
85:
86:
87: if(demux.add_socket_wakeup(this, sio_flags) == -1)
88: fprintf(stderr,"failed to re-add_socket_wakeup\n");
89: }
90:
91:
92: bool
93: connect_request::start_async_op(demuxer& demux, flx_drv* drv, void* f)
94: {
95:
96:
97: RECORD_THREAD_INFO(fw);
98:
99: posix_demuxer* pd = (posix_demuxer*)&demux;
100:
101:
102: if(start(*pd) == -1) return true;
103:
104:
105:
106: return false;
107:
108: /*
109:
110: if(0 == socket_err) fprintf(stderr, "WOW, instant CONNECT\n");
111:
112:
113:
114: return 0 == socket_err;
115: */
116: }
117:
118: void
119: connect_request::wakeup(posix_demuxer& demux)
120: {
121:
122:
123:
124: connect_control_block::wakeup(demux);
125:
126:
127: fw.wake();
128: }
129:
130:
131:
132: bool
133: accept_request::start_async_op(demuxer& demux, flx_drv* drv, void* f)
134: {
135:
136: RECORD_THREAD_INFO(fw);
137:
138: posix_demuxer* pd = (posix_demuxer*)&demux;
139: return (start(*pd) == -1);
140: }
141:
142: void
143: accept_request::wakeup(posix_demuxer& demux)
144: {
145:
146: accept_control_block::wakeup(demux);
147:
148: if(accepted == -1)
149: {
150:
151: fprintf(stderr, "accept request failed (%i), retrying...\n",
152: socket_err);
153:
154: if(start(demux) == -1)
155: fprintf(stderr, "failed again... probably was a bad idea\n");
156: return;
157: }
158:
159: fw.wake();
160: }
161:
162:
163: flxfileio_request::~flxfileio_request(){}
164: flxfileio_request::flxfileio_request(){}
165:
166:
167: bool
168: flxfileio_request::start_async_op(demux::demuxer& demux, flx_drv* drv, void* f)
169: {
170:
171:
172: RECORD_THREAD_INFO(fw);
173:
174:
175:
176: pflx_drv* pdrv = (pflx_drv*)drv;
177: pdrv->get_aio_worker()->add_fileio_request(this);
178:
179: return false;
180: }
181:
182:
183: void
184: flxfileio_request::finished()
185: {
186:
187: fw.wake();
188: }
189:
190:
191: }}
192:
Start cpp section to faio/faio_winio.hpp[1
/1
]
1: #line 1201 "./lpsrc/flx_faio.pak"
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14: namespace flx { namespace faio {
15:
16:
17:
18:
19:
20:
21:
22: class FAIO_EXTERN iocp_associator : public flx_driver_request_base {
23: SOCKET s;
24: public:
25:
26: iocp_associator() {}
27: iocp_associator(SOCKET associatee) : s(associatee) {}
28:
29: virtual bool start_async_op(demux::demuxer& demux, flx_drv* drv, void* f);
30: };
31:
32:
33:
34:
35: class FAIO_EXTERN waio_base : public flx_driver_request_base {
36: protected:
37: thread_wakeup fw;
38: public:
39: bool success;
40:
41: waio_base() : success(false) {}
42:
43:
44: virtual void iocp_op_finished( DWORD nbytes, ULONG_PTR udat,
45: LPOVERLAPPED olp, int err);
46: };
47:
48:
49:
50:
51:
52: class FAIO_EXTERN wasync_accept
53: : public waio_base, public demux::acceptex_control_block
54: {
55: public:
56: wasync_accept() {}
57:
58: wasync_accept(SOCKET l, SOCKET a) { listener = l; acceptor = a; }
59:
60: virtual bool start_async_op(demux::demuxer& demux, flx_drv* drv, void* f);
61:
62: virtual void iocp_op_finished( DWORD nbytes, ULONG_PTR udat,
63: LPOVERLAPPED olp, int err);
64: };
65:
66: class FAIO_EXTERN connect_ex
67: : public waio_base, public demux::connectex_control_block
68: {
69: public:
70:
71: connect_ex() {}
72:
73: connect_ex(SOCKET soc, const char* addr, int port)
74: { s = soc; addy = addr; p = port; }
75:
76: virtual bool start_async_op(demux::demuxer& demux, flx_drv* drv, void* f);
77:
78: virtual void iocp_op_finished( DWORD nbytes, ULONG_PTR udat,
79: LPOVERLAPPED olp, int err);
80: };
81:
82:
83: class FAIO_EXTERN wasync_transmit_file
84: : public waio_base, public demux::transmitfile_control_block
85: {
86: public:
87: wasync_transmit_file()
88: : transmitfile_control_block(INVALID_SOCKET, NULL) {}
89:
90: wasync_transmit_file(SOCKET dst)
91: : transmitfile_control_block(dst) {}
92:
93: wasync_transmit_file(SOCKET dst, HANDLE src)
94: : transmitfile_control_block(dst, src) {}
95:
96:
97: virtual bool start_async_op(demux::demuxer& demux, flx_drv* drv, void* f);
98:
99: virtual void iocp_op_finished(DWORD nbytes, ULONG_PTR udat,
100: LPOVERLAPPED olp, int err);
101: };
102:
103:
104: class FAIO_EXTERN wsa_socketio
105: : public waio_base, public demux::wsasocketio_control_block
106: {
107: public:
108: wsa_socketio()
109: : wsasocketio_control_block(INVALID_SOCKET, NULL, false) {}
110:
111: wsa_socketio(SOCKET src, demux::sel_param* ppb, bool read)
112: : wsasocketio_control_block(src, ppb, read) {}
113:
114: virtual bool start_async_op(demux::demuxer& demux, flx_drv* drv, void* f);
115:
116: virtual void iocp_op_finished( DWORD nbytes, ULONG_PTR udat,
117: LPOVERLAPPED olp, int err);
118: };
119:
120:
121: class FAIO_EXTERN winfile_io
122: : public waio_base, public demux::winfileio_control_block
123: {
124: public:
125: winfile_io()
126: : winfileio_control_block(NULL, NULL, 0, false){}
127:
128:
129: winfile_io(HANDLE f, void* buf, int len, bool read)
130: : winfileio_control_block(f, buf, len, read) {}
131:
132: virtual bool start_async_op(demux::demuxer& demux, flx_drv* drv, void* f);
133:
134: virtual void iocp_op_finished( DWORD nbytes, ULONG_PTR udat,
135: LPOVERLAPPED olp, int err);
136: };
137:
138:
139: }}
140:
141:
Start cpp section to faio/faio_winio.cpp[1
/1
]
1: #line 1343 "./lpsrc/flx_faio.pak"
2:
3:
4: using namespace flx::demux;
5: namespace flx { namespace faio {
6:
7:
8: bool
9: iocp_associator::start_async_op(demuxer& demux, flx_drv* drv, void* f)
10: {
11: iocp_demuxer* iod = (iocp_demuxer*)&demux;
12:
13: if(iod->associate_with_iocp((HANDLE)s, 0) != 0)
14: fprintf(stderr,"associate request failed - get result here!\n");
15:
16: return true;
17: }
18:
19: void
20: waio_base::iocp_op_finished( DWORD nbytes, ULONG_PTR udat,
21: LPOVERLAPPED olp, int err)
22: {
23:
24:
25:
26:
27: if(NO_ERROR != err)
28: fprintf(stderr,"catchall wakeup got error: %i (should store it)\n", err);
29:
30: success = (NO_ERROR == err);
31: fw.wake();
32: }
33:
34: bool
35: wasync_accept::start_async_op(demuxer& demux, flx_drv* drv, void* f)
36: {
37: RECORD_THREAD_INFO(fw);
38: return start_overlapped();
39: }
40:
41: void
42: wasync_accept::iocp_op_finished( DWORD nbytes, ULONG_PTR udat,
43: LPOVERLAPPED olp, int err)
44: {
45: waio_base::iocp_op_finished(nbytes, udat, olp, err);
46: }
47:
48:
49: bool
50: connect_ex::start_async_op(demux::demuxer& demux, flx_drv* drv, void* f)
51: {
52: RECORD_THREAD_INFO(fw);
53: return start_overlapped();
54: }
55:
56: void
57: connect_ex::iocp_op_finished( DWORD nbytes, ULONG_PTR udat,
58: LPOVERLAPPED olp, int err)
59: {
60: waio_base::iocp_op_finished(nbytes, udat, olp, err);
61: }
62:
63:
64: bool
65: wasync_transmit_file::start_async_op(demux::demuxer& demux, flx_drv* drv, void* f)
66: {
67: RECORD_THREAD_INFO(fw);
68: return start_overlapped();
69: }
70:
71: void
72: wasync_transmit_file::iocp_op_finished( DWORD nbytes, ULONG_PTR udat,
73: LPOVERLAPPED olp, int err)
74: {
75: waio_base::iocp_op_finished(nbytes, udat, olp, err);
76: }
77:
78: bool
79: wsa_socketio::start_async_op(demux::demuxer& demux, flx_drv* drv, void* f)
80: {
81: RECORD_THREAD_INFO(fw);
82: return start_overlapped();
83: }
84:
85:
86:
87:
88: void
89: wsa_socketio::iocp_op_finished( DWORD nbytes, ULONG_PTR udat,
90: LPOVERLAPPED olp, int err)
91: {
92:
93:
94:
95:
96:
97: ppb->bytes_written += nbytes;
98:
99:
100:
101:
102:
103: if(0 == nbytes || ppb->finished())
104: {
105:
106: waio_base::iocp_op_finished(nbytes, udat, olp, err);
107: }
108: else
109: {
110:
111:
112:
113: if(start_overlapped())
114: fprintf(stderr, "socketio restart finished! WHAT TO DO!?!\n");
115: }
116: }
117:
118:
119:
120: bool
121: winfile_io::start_async_op(demuxer& demux, flx_drv* drv, void* f)
122: {
123: RECORD_THREAD_INFO(fw);
124: return start_overlapped();
125: }
126:
127:
128:
129:
130:
131: void
132: winfile_io::iocp_op_finished( DWORD nbytes, ULONG_PTR udat,
133: LPOVERLAPPED olp, int err)
134: {
135:
136:
137:
138:
139:
140:
141:
142: pb.bytes_written += nbytes;
143:
144: waio_base::iocp_op_finished(nbytes, udat, olp, err);
145: }
146:
147: }}
148:
149:
Start felix section to lib/flx_faio.flx[1
/1
]
1: #line 1493 "./lpsrc/flx_faio.pak"
2:
3:
4: module Faio {
5: requires package "demux";
6: requires package "faio";
7:
8: open C_hack;
9:
10: proc faio_req[t](x:&t) {
11:
12:
13:
14:
15:
16:
17: val y : &address = reinterpret[&address] x;
18: svc (svc_general y);
19: }
20:
21:
22:
23:
24:
25:
26: proc get_thread(thread: ptr[fthread]) {
27: svc (svc_get_fthread thread );
28: }
29:
30: header stdlib_h = '#include <stdlib.h>';
31:
32:
33:
34:
35: fun malloc: int -> address = 'malloc($1)' requires stdlib_h;
36: proc free: address = 'free($1);' requires stdlib_h;
37:
38:
39:
40:
41:
42: header = '#include "faio_asyncio.hpp"';
43:
44: type sel_param = "flx::demux::sel_param";
45: type sel_param_ptr = "flx::demux::sel_param*";
46:
47:
48: fun to_ptr : sel_param -> sel_param_ptr = '&$1';
49:
50:
51: fun get_bytes_done : sel_param_ptr -> int = '$1->bytes_written';
52: proc init_pb : sel_param*address*int
53: = '{$1.buffer=(char*)$2;$1.buffer_size=$3;$1.bytes_written=0;}';
54:
55:
56:
57:
58: proc calc_eof(pb: sel_param_ptr, len: &int, eof: &bool)
59: {
60: var bytes_done = pb.bytes_done;
61: *eof = (bytes_done != *len);
62: *len = bytes_done;
63: }
64:
65:
66:
67: proc gc_collect_me_not_hack[t]: t = ';';
68:
69:
70:
71:
72:
73: type copipe = 'flx::faio::copipe_endpt*';
74:
75:
76: proc connect : copipe*fthread*sel_param*bool
77: = '$1->pipe->connect((void*)$2, &$3, $4, $1->get_channel($4));';
78: fun pipe_request : copipe -> address = '$1->pipe';
79: fun to_ptr : copipe -> address = '$1';
80: proc priv_shutdown : copipe*int = '$1->shutdown($2);';
81: proc priv_delete : copipe = 'delete $1;';
82:
83:
84:
85:
86: header 'typedef struct{ flx::faio::copipe_endpt* foo[2]; }sorry_awful;';
87: type co_awful = 'sorry_awful';
88: proc init_awful_pair : co_awful = 'flx::faio::copipe_endpt::pipe_pair($1.foo);';
89: fun get : co_awful*int -> copipe = '$1.foo[$2]';
90:
91: proc flx_copipe_pair(p1: &copipe, p2: &copipe)
92: {
93: var awful: co_awful;
94: init_awful_pair(awful);
95: *p1 = get(awful, 0);
96: *p2 = get(awful, 1);
97: }
98:
99:
100:
101:
102: private proc copipe_driver_request(pipe: copipe)
103: {
104: var request = pipe_request pipe;
105: faio_req$ &request;
106: }
107:
108: proc co_rw(pipe: copipe, len: &int, buf: address, eof: &bool, read_flag: bool)
109: {
110:
111:
112: var thread : fthread;
113: get_thread(addr thread);
114:
115: var pb: sel_param;
116: init_pb(pb, buf, *len);
117:
118:
119: connect(pipe, thread, pb, read_flag);
120:
121: copipe_driver_request(pipe);
122:
123: calc_eof(to_ptr(pb), len, eof);
124: }
125:
126:
127:
128:
129:
130: proc co_shutdown(pipe: copipe, how: int)
131: {
132: priv_shutdown(pipe, how);
133:
134: copipe_driver_request(pipe);
135: }
136:
137: proc co_close(pipe: copipe)
138: {
139: co_shutdown(pipe, 2);
140: priv_delete(pipe);
141:
142: }
143:
144: proc co_read(pipe: copipe, len: &int, buf: address, eof: &bool)
145: {
146: co_rw(pipe, len, buf, eof, true);
147: }
148:
149: proc co_write(pipe: copipe, len: &int, buf: address, eof: &bool)
150: {
151: co_rw(pipe, len, buf, eof, false);
152: }
153:
154:
155:
156: type sleep_request = 'flx::faio::sleep_request';
157: fun mk_sleep_request: double -> sleep_request = 'flx::faio::sleep_request($1)';
158:
159: proc sleep(delta: double)
160: {
161: var sr = mk_sleep_request delta;
162: faio_req$ &sr;
163: }
164:
165:
166: }
167:
Start felix section to lib/flx_faio_posix.flx[1
/1
]
1: #line 1661 "./lpsrc/flx_faio.pak"
2:
3:
4:
5: include "flx_faio";
6: module Faio_posix {
7: header faio_posixio_hpp = '#include "faio_posixio.hpp"';
8: requires package "demux";
9: requires package "faio";
10: open C_hack;
11: open Faio;
12:
13:
14: header unistd_h = '#include <unistd.h>';
15: header fcntl_h = '#include <fcntl.h>';
16: header sys_socket_h = '#include <sys/socket.h>';
17: header sockety_h = '#include "demux_sockety.hpp"';
18:
19: proc close: int = 'close($1);' requires unistd_h;
20: proc shutdown: int*int = 'shutdown($a);' requires sys_socket_h;
21:
22:
23:
24: fun aio_ropen: string -> int = 'open($1.c_str(), O_RDONLY | O_NONBLOCK, 0)'
25: requires fcntl_h;
26:
27:
28: fun ropen: charp -> int = 'open($1, O_RDONLY, 0)' requires fcntl_h;
29:
30:
31: header = '#include "faio_posixio.hpp"';
32:
33:
34: type socketio_request = "flx::faio::socketio_request";
35:
36: fun mk_socketio_request: int*address*int*bool -> socketio_request
37: = 'flx::faio::socketio_request($1, (char*)$2, $3, $4)';
38:
39: fun get_pb: socketio_request -> sel_param_ptr = '&$1.sv.pb';
40:
41:
42: proc async_rw(fd: int, len: &int, buf: address, eof: &bool, read_flag: bool)
43: {
44: var asyncb = mk_socketio_request(fd, buf, *len, read_flag);
45:
46:
47: faio_req$ &asyncb;
48:
49: calc_eof(asyncb.pb, len, eof);
50: }
51:
52: proc async_read(fd: int, len: &int, buf: address,
53: eof: &bool)
54: {
55: async_rw(fd, len, buf, eof, true);
56: }
57:
58: proc async_write(fd: int, len: &int, buf: address, eof: &bool)
59: {
60: async_rw(fd, len, buf, eof, false);
61: }
62:
63: type flxfileio_request = "flx::faio::flxfileio_request";
64:
65:
66: fun mk_faio: int*address*int*int*bool -> flxfileio_request
67: = 'flx::faio::flxfileio_request($1, (char*)$2, $3, $4, $5)';
68: fun get_pb: flxfileio_request -> sel_param_ptr = '&$1.pb';
69:
70: proc faio_rw(fd: int, len: &int, buf: address, eof: &bool, read_flag: bool)
71: {
72:
73: var faio = mk_faio(fd, buf, *len, 0, read_flag);
74: faio_req$ &faio;
75: calc_eof(faio.pb, len, eof);
76: }
77:
78: proc faio_read(fd: int, len: &int, buf: address,
79: eof: &bool)
80: {
81: faio_rw(fd, len, buf, eof, true);
82: }
83:
84: proc faio_write(fd: int, len: &int, buf: address, eof: &bool)
85: {
86: faio_rw(fd, len, buf, eof, false);
87: }
88:
89:
90: type async_connect = 'flx::faio::connect_request';
91:
92: fun mk_async_connect: charp*int -> async_connect = 'flx::faio::connect_request($a)';
93: fun get_socket: async_connect -> int = '$1.s';
94: fun get_err: async_connect -> int = '$1.socket_err';
95:
96:
97: proc connect(s: &int, addr: charp, port: int)
98: {
99: var ac = mk_async_connect(addr, port);
100: faio_req$ ∾
101: *s = ac.socket;
102: }
103:
104: type accept_request = "flx::faio::accept_request";
105:
106: fun mk_accept: int -> accept_request = 'flx::faio::accept_request($1)';
107: fun get_socket: accept_request -> int = '$1.accepted';
108:
109:
110: proc mk_listener: lvalue[int]*lvalue[int]*int
111: = '$1 = flx::demux::create_async_listener(&$2, $3);' requires sockety_h;
112:
113: proc accept(s: &int, listener: int)
114: {
115: var acc = mk_accept listener ;
116: faio_req$ &acc;
117: *s = acc.socket;
118: }
119:
120:
121:
122: }
123:
Start felix section to lib/flx_faio_win32.flx[1
/1
]
1: #line 1785 "./lpsrc/flx_faio.pak"
2:
3: include "flx_faio";
4:
5: module Faio_win32 {
6: requires package "demux";
7: requires package "faio";
8:
9: open C_hack;
10: open Faio;
11: header '#include "faio_winio.hpp"';
12:
13:
14: fun GetLastError: 1 -> int = 'GetLastError()';
15:
16: ctypes SOCKET;
17:
18:
19:
20:
21: fun cmk_socket : unit -> SOCKET = '::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)';
22:
23:
24:
25:
26: proc mk_socket(s: &SOCKET)
27: {
28: *s = cmk_socket();
29: associate_with_iocp(*s);
30: }
31:
32:
33: type wasync_accept = "flx::faio::wasync_accept";
34:
35: fun mk_accept: SOCKET*SOCKET -> wasync_accept = 'flx::faio::wasync_accept($a)';
36:
37: fun get_success[t]: t -> bool = '$1.success';
38:
39:
40: const INVALID_SOCKET: SOCKET = 'INVALID_SOCKET';
41:
42: fun eq : SOCKET*SOCKET -> bool = '($1 == $2)';
43:
44:
45: proc Accept(success: &bool, listener: SOCKET, accepted: SOCKET)
46: {
47: var acc = mk_accept(listener, accepted);
48:
49: faio_req$ &acc;
50:
51: *success = get_success(acc);
52: }
53:
54: type connect_ex="flx::faio::connect_ex";
55: fun mk_connect_ex: SOCKET*charp*int -> connect_ex = 'flx::faio::connect_ex($a)';
56:
57:
58:
59: proc Connect(success: &bool, s: SOCKET, addr: charp, port: int)
60: {
61: var con = mk_connect_ex(s, addr, port);
62: faio_req$ &con;
63: *success = get_success(con);
64: }
65:
66: proc Connect(s: &SOCKET, addr: charp, port: int)
67: {
68: mk_socket s;
69: var success: bool;
70: Connect(&success, *s, addr, port);
71:
72:
73: }
74:
75:
76: proc cmk_listener: lvalue[SOCKET]*lvalue[int]*int
77: = '$1 = flx::demux::create_listener_socket(&$2, $3);';
78:
79: proc mk_listener(listener: &SOCKET, port: &int, backlog: int)
80: {
81: *listener <- cmk_listener(*port, backlog);
82: associate_with_iocp(*listener);
83: }
84:
85:
86: proc closesocket: SOCKET = 'closesocket($1);';
87:
88: const SD_RECEIVE:int = 'SD_RECEIVE';
89: const SD_SEND:int = 'SD_SEND';
90: const SD_BOTH:int = 'SD_BOTH';
91:
92: proc shutdown: SOCKET*int = 'shutdown($1, $2);';
93:
94: type wasync_transmit_file = "flx::faio::wasync_transmit_file";
95:
96:
97: type WFILE = 'HANDLE';
98: const INVALID_HANDLE_VALUE: WFILE = 'INVALID_HANDLE_VALUE';
99: fun eq : WFILE*WFILE -> bool = '($1 == $2)';
100:
101:
102:
103:
104:
105:
106:
107:
108: proc OpenFile: lvalue[WFILE]*string =
109: '$1 = CreateFile($2.c_str(), FILE_READ_DATA, FILE_SHARE_READ, NULL,\
110: OPEN_EXISTING, FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED, NULL);';
111:
112:
113: proc OpenFileDuplex: lvalue[WFILE]*string =
114: '$1 = CreateFile($2.c_str(), FILE_READ_DATA | FILE_WRITE_DATA,\
115: FILE_SHARE_READ | FILE_SHARE_WRITE, NULL, OPEN_EXISTING,\
116: FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED, NULL);';
117:
118: proc CloseFile: WFILE = 'if(!CloseHandle($1))\
119: fprintf(stderr, "CloseHandle(WFILE) failed: %i\\n", GetLastError());';
120:
121:
122:
123:
124: fun mk_transmit_file : SOCKET*WFILE -> wasync_transmit_file
125: = 'flx::faio::wasync_transmit_file($a)';
126:
127:
128: proc TransmitFile(s: SOCKET, f: WFILE)
129: {
130: var tf = mk_transmit_file(s, f);
131: faio_req$ &tf;
132: }
133:
134:
135:
136:
137:
138:
139: fun mk_reuse_socket : SOCKET -> wasync_transmit_file
140: = 'flx::faio::wasync_transmit_file($a)';
141:
142: proc ReuseSocket(s: SOCKET)
143: {
144: var tf = mk_reuse_socket(s);
145: faio_req$ &tf;
146: }
147:
148: type wsa_socketio = "flx::faio::wsa_socketio";
149: fun mk_wsa_socketio: SOCKET*sel_param_ptr*bool->wsa_socketio = 'flx::faio::wsa_socketio($a)';
150:
151: proc WSARecv(s: SOCKET, len: &int, buf: address, eof: &bool)
152: {
153: var pb: sel_param;
154: init_pb(pb, buf, *len);
155: var ppb: sel_param_ptr = to_ptr pb;
156:
157: var rev = mk_wsa_socketio(s, ppb, true);
158: faio_req$ &rev;
159:
160: calc_eof(ppb, len, eof);
161: }
162:
163: proc WSASend(s: SOCKET, len: &int, buf: address, eof: &bool)
164: {
165: var pb: sel_param;
166: init_pb(pb, buf, *len);
167: var ppb: sel_param_ptr = to_ptr pb;
168:
169: var rev = mk_wsa_socketio(s, ppb, false);
170: faio_req$ &rev;
171: calc_eof(ppb, len, eof);
172: }
173:
174: type winfile_io = "flx::faio::winfile_io";
175:
176: fun mk_winfile_io: WFILE*address*int*bool->winfile_io = 'flx::faio::winfile_io($a)';
177:
178:
179: fun get_pb: winfile_io -> sel_param_ptr = '&$1.pb';
180:
181: proc ReadFile(f: WFILE, len: &int, buf: address, eof: &bool)
182: {
183: var io = mk_winfile_io(f, buf, *len, true);
184: faio_req$ &io;
185:
186: calc_eof(io.pb, len, eof);
187: }
188:
189: proc WriteFile(f: WFILE, len: &int, buf: address, eof: &bool)
190: {
191: var io = mk_winfile_io(f, buf, *len, false);
192: faio_req$ &io;
193: calc_eof(io.pb, len, eof);
194: }
195:
196:
197:
198:
199: type iocp_associator = "flx::faio::iocp_associator";
200: fun mk_iocp_associator: SOCKET -> iocp_associator = 'flx::faio::iocp_associator($1)';
201:
202:
203:
204:
205: proc associate_with_iocp(s: SOCKET)
206: {
207:
208: var req = mk_iocp_associator(s);
209: faio_req$ &req;
210: }
211:
212: }
213:
Start felix section to lib/flx_stream.flx[1
/1
]
1: #line 1999 "./lpsrc/flx_faio.pak"
2: include "flx_faio";
3: include "flx_faio_posix";
4: include "flx_faio_win32";
5:
6:
7: open Faio;
8:
9:
10: open Faio_posix;
11:
12:
13:
14: open Faio_win32;
15:
16:
17:
18: module Flx_stream {
19: requires package "demux";
20: requires package "faio";
21:
22: union flx_stream =
23: | CO of copipe
24: | DEVNULL
25: #if POSIX
26: | UFD of int
27: | USOCK of int
28: #endif
29:
30:
31: #if WIN32
32: | WSOCK of SOCKET
33: | WFD of WFILE
34: #endif
35:
36:
37:
38:
39:
40:
41: ;
42:
43:
44:
45:
46:
47:
48:
49:
50:
51:
52:
53:
54:
55:
56:
57:
58: proc flx_read(strm: flx_stream, len: &int, buf: address, eof: &bool)
59: {
60: match strm with
61: | CO ?pipe => { co_read(pipe, len, buf, eof); }
62: | DEVNULL => { *len = 0; *eof = true; }
63: #if POSIX
64: | USOCK ?fd => { async_read(fd, len, buf, eof); }
65: | UFD ?fd => { faio_read(fd, len, buf, eof); }
66: #endif
67:
68: #if WIN32
69: | WSOCK ?s => { WSARecv(s, len, buf, eof); }
70: | WFD ?file => { ReadFile(file, len, buf, eof); }
71: #endif
72: endmatch;
73: }
74:
75: proc flx_write(strm: flx_stream, len: &int, buf: address, eof: &bool)
76: {
77: match strm with
78: | CO ?pipe => { co_write(pipe, len, buf, eof); }
79: | DEVNULL => { /* nothing to do */ }
80: #if POSIX
81: | USOCK ?fd => { async_write(fd, len, buf, eof); }
82: | UFD ?fd => { faio_write(fd, len, buf, eof); }
83: #endif
84:
85: #if WIN32
86: | WSOCK ?s => { WSASend(s, len, buf, eof); }
87: | WFD ?file => { WriteFile(file, len, buf, eof); }
88: #endif
89: endmatch;
90: }
91:
92: proc flx_shutdown(strm: flx_stream, how: int)
93: {
94: match strm with
95: | CO ?pipe => { co_shutdown(pipe, how); }
96: | DEVNULL => { /* nothing to do? */ }
97: #if POSIX
98: | USOCK ?socket => { shutdown(socket, how); }
99: | UFD => { /* nuthin */ }
100: #endif
101:
102: #if WIN32
103: | WFD => { /* nothing to do */ }
104: | WSOCK ?socket => { shutdown(socket, how); }
105: #endif
106: endmatch;
107: }
108:
109: proc flx_close(strm: flx_stream)
110: {
111: match strm with
112: | CO ?pipe => { co_close(pipe); }
113: | DEVNULL => { /* nothing to do */ }
114: #if POSIX
115: | USOCK ?socket => { close(socket); }
116: | UFD ?fd => { close(fd); }
117: #endif
118:
119: #if WIN32
120: | WSOCK ?socket => { closesocket(socket); }
121: | WFD ?file => { CloseFile(file); }
122: #endif
123: endmatch;
124: }
125:
126: proc flx_popen(ourend: &flx_stream, p:flx_stream->0)
127: {
128: var a: copipe;
129: var b: copipe;
130:
131: flx_copipe_pair(&a, &b);
132: *ourend = CO a;
133: var theirend = CO b;
134: spawn_fthread { p theirend; };
135: }
136:
137:
138:
139:
140: proc cat(infd: flx_stream, outfd: flx_stream, buf: address, bufsize: int) {
141: var eof = false;
142: var weof = false;
143: var len: int;
144:
145:
146: while{not(eof) and not(weof)} {
147: len = bufsize;
148:
149: flx_read(infd, &len, buf, &eof);
150:
151: flx_write(outfd, &len, buf, &weof);
152: };
153: };
154:
155: proc cat(infd: flx_stream, outfd: flx_stream)
156: {
157: val BUFSIZE = 10*1024;
158: var buf = malloc(BUFSIZE);
159:
160:
161: cat(infd, outfd, buf, BUFSIZE);
162: free(buf);
163: }
164:
165: open List;
166:
167:
168:
169:
170:
171: proc cat(in_fds: list[flx_stream], outfd: flx_stream,
172: buf: address, bufsize: int)
173: {
174: match in_fds with
175: | Empty[flx_stream] => {}
176: | Cons[flx_stream] (?fd, ?l) =>
177: {
178: cat(fd, outfd, buf, bufsize);
179: cat(l, outfd, buf, bufsize);
180: }
181: endmatch
182: ;
183: }
184:
185:
186: proc echo(fd: flx_stream, buf: address, bufsize: int)
187: {
188:
189: cat(fd, fd, buf, bufsize);
190: };
191:
192:
193:
194:
195: proc tee(infd: flx_stream, outfd: flx_stream, outfd2: flx_stream)
196: {
197: var eof = false;
198: var weof = false;
199: var weof2 = false;
200: var len: int;
201:
202: val BUFSIZE = 10*1024;
203: var buf = malloc(BUFSIZE);
204:
205:
206: while{not(eof) and not(weof) and not(weof2)} {
207: len = BUFSIZE;
208: flx_read(infd, &len, buf, &eof);
209: flx_write(outfd, &len, buf, &weof);
210: flx_write(outfd2, &len, buf, &weof2);
211: };
212: free buf;
213: }
214:
215: }
216:
Start felix section to lib/flx_socket.flx[1
/1
]
1: #line 2216 "./lpsrc/flx_faio.pak"
2:
3: include "flx_faio";
4:
5:
6:
7: include "flx_faio_posix";
8: open Faio_posix;
9:
10:
11:
12: include "flx_faio_win32";
13: open Faio_win32;
14:
15:
16:
17: include "flx_stream";
18: open Flx_stream;
19:
20: module Flx_socket {
21:
22: #if POSIX
23:
24: typedef flx_listener = int;
25: typedef flx_socket = int;
26:
27: fun flx_socket_to_stream(s: flx_socket) : flx_stream => USOCK s;
28:
29: proc mk_flx_listener(l: &flx_listener, port: &int, qlen: int)
30: {
31:
32: var listener: int <- mk_listener(*port, qlen);
33: *l = listener;
34: }
35:
36: proc flx_accept(l: flx_listener, s: &flx_socket)
37: {
38: accept(s, l);
39: }
40:
41: #elif WIN32
42:
43: typedef flx_listener = SOCKET;
44: typedef flx_socket = SOCKET;
45:
46: fun flx_socket_to_stream(s: flx_socket) : flx_stream => WSOCK s;
47:
48: proc mk_flx_listener(l: &flx_listener, port: &int, qlen: int)
49: {
50:
51: var listener: SOCKET;
52: mk_listener(&listener, port, qlen);
53: *l = listener;
54: }
55:
56: proc flx_accept(l: flx_listener, s: &flx_socket)
57: {
58: var success: bool;
59:
60: mk_socket(s);
61:
62:
63: Accept(&success, l, *s);
64:
65:
66: if success then {} else {fprint (cout, "Accept failed! num?\n"); } endif;
67: }
68:
69: #endif
70:
71: proc flx_read(s: flx_socket, len: &int, buf: address, eof: &bool)
72: {
73: #if POSIX
74: async_read(s, len, buf, eof);
75: #elif WIN32
76: WSARecv(s, len, buf, eof);
77: #endif
78: }
79:
80: proc flx_write(s: flx_socket, len: &int, buf: address, eof: &bool)
81: {
82: #if POSIX
83: async_write(s, len, buf, eof);
84: #elif WIN32
85: WSASend(s, len, buf, eof);
86: #endif
87: }
88:
89:
90: proc flx_shutdown(s: flx_socket, how: int)
91: {
92: #if POSIX
93: shutdown(s, how);
94: #elif WIN32
95: shutdown(s, how);
96: #endif
97: }
98:
99: proc flx_close(s: flx_socket)
100: {
101: #if POSIX
102: close(s);
103: #elif WIN32
104: closesocket(s);
105: #endif
106: }
107:
108:
109: #if WIN32
110: proc flx_connect_win32(strm: &flx_socket, addr: charp, port: int)
111: {
112: var s: SOCKET;
113: Connect( &s, addr, port );
114:
115: *strm = s;
116: }
117: #endif
118:
119: #if POSIX
120: proc flx_connect_posix(strm: &flx_socket, addr: charp, port: int)
121: {
122: var s: int;
123: connect( &s, addr, port );
124:
125: *strm = s;
126: }
127: #endif
128:
129:
130: proc flx_connect(s: &flx_socket, addr: charp, port: int)
131: {
132: #if WIN32
133: flx_connect_win32(s, addr, port);
134: #else
135: flx_connect_posix(s, addr, port);
136: #endif
137: }
138:
139:
140:
141:
142:
143:
144:
145:
146:
147:
148: proc get_line(strm: flx_socket, s: &string)
149: {
150: var c: char;
151: val ac = C_hack::address_of(c);
152: var str: string;
153: var finished = false;
154:
155: while{not finished}
156: {
157: var len = 1;
158: var eof: bool;
159:
160: flx_read(strm, &len, ac, &eof);
161:
162:
163: if eof or c == char '\n' then
164: {
165:
166: finished = true;
167: }
168: else
169: {
170:
171: str += c;
172:
173: } endif;
174:
175: };
176:
177: *s = str;
178:
179: }
180:
181: fun str_addr: string -> address = "(void*)$1.c_str()";
182:
183: proc write_string(strm: flx_socket, s: string)
184: {
185:
186:
187: var bugger = s;
188:
189: var slen = len (bugger);
190: var addr = str_addr(bugger);
191: var eof: bool;
192:
193: flx_write(strm, &slen, addr, &eof);
194: }
195: }
196: