1: #line 4 "./lpsrc/flx_posixio.ipk"
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15:
16:
17:
18:
19:
20: namespace flx { namespace faio {
21:
22: class FAIO_EXTERN socketio_wakeup : public demux::socket_wakeup {
23: public:
24: demux::sel_param pb;
25: int sio_flags;
26: struct socketio_request *request;
27:
28: virtual void wakeup(demux::posix_demuxer& demux);
29: };
30:
31:
32:
33:
34:
35: class FAIO_EXTERN socketio_request : public flx_driver_request_base {
36: public:
37: socketio_wakeup sv;
38: demux::posix_demuxer *pd;
39: socketio_request() {}
40: socketio_request(socketio_request const&);
41: void operator = (socketio_request const&);
42:
43: socketio_request(demux::posix_demuxer *pd_a, int s, char* buf, long len, bool r);
44: bool start_async_op_impl();
45: };
46:
47:
48: class FAIO_EXTERN connect_request
49: : public flx_driver_request_base, public demux::connect_control_block {
50: public:
51: demux::posix_demuxer *pd;
52: connect_request() {}
53:
54: connect_request(demux::posix_demuxer *pd_a,const char* addr, int port);
55: bool start_async_op_impl();
56: virtual void wakeup(demux::posix_demuxer&);
57: };
58:
59:
60: class FAIO_EXTERN accept_request
61: : public flx_driver_request_base, public demux::accept_control_block {
62: public:
63:
64:
65:
66:
67: demux::posix_demuxer *pd;
68: accept_request() {}
69:
70:
71: accept_request(demux::posix_demuxer *pd_a, int listener) : pd(pd_a) { s = listener; }
72:
73:
74: bool start_async_op_impl();
75:
76:
77: virtual void wakeup(demux::posix_demuxer& demux);
78: };
79:
80:
81:
82:
83: class FAIO_EXTERN flxfileio_request
84: : public flx_driver_request_base, public demux::fileio_request
85: {
86: pthread::worker_fifo *aio_worker;
87: public:
88: flxfileio_request();
89: ~flxfileio_request();
90:
91: flxfileio_request(
92: pthread::worker_fifo *a,
93: int f, char* buf, long len, long off, bool rd
94: )
95: : fileio_request(f, buf, len, off, rd), aio_worker(a)
96: {
97:
98: }
99:
100:
101: bool start_async_op_impl();
102: void finished();
103: };
104:
105: }}
106:
107:
1: #line 112 "./lpsrc/flx_posixio.ipk"
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13: using namespace flx::demux;
14: namespace flx { namespace faio {
15:
16: connect_request::connect_request(demux::posix_demuxer *pd_a,const char* addr, int port) :pd(pd_a) { addy = addr; p = port; s=-1; }
17:
18: socketio_request::socketio_request(demux::posix_demuxer *pd_a, int s, char* buf, long len, bool read)
19: : pd(pd_a)
20: {
21:
22: sv.s = s;
23: sv.request = this;
24:
25: sv.sio_flags = ((read) ? PDEMUX_READ : PDEMUX_WRITE);
26:
27: sv.pb.buffer = buf;
28: sv.pb.buffer_size = len;
29: sv.pb.bytes_written = 0;
30: }
31:
32: socketio_request::socketio_request(socketio_request const &a) : pd(a.pd)
33: {
34:
35: sv = a.sv;
36: sv.request = this;
37: }
38:
39:
40: void socketio_request::operator=(socketio_request const &a)
41: {
42:
43:
44: flx_driver_request_base::operator=(a);
45: sv = a.sv;
46: sv.request = this;
47: pd = a.pd;
48: }
49:
50: bool
51: socketio_request::start_async_op_impl()
52: {
53:
54:
55:
56:
57:
58: bool failed = (pd->add_socket_wakeup(&sv, sv.sio_flags) == -1);
59: if (failed)
60: fprintf(stderr,"socketio_request FAILED %p, sock=%d, dir=%d\n",this, sv.s, sv.sio_flags);
61:
62:
63: return failed;
64: }
65:
66:
67: void
68: socketio_wakeup::wakeup(posix_demuxer& demux)
69: {
70:
71:
72: bool connection_closed;
73:
74:
75:
76:
77:
78:
79:
80:
81:
82:
83: if(wakeup_flags & PDEMUX_ERROR)
84: {
85: connection_closed = true;
86:
87: }
88:
89: else if(wakeup_flags & PDEMUX_EOF)
90: {
91: connection_closed = true;
92:
93: }
94:
95: else if(wakeup_flags & PDEMUX_READ)
96: {
97:
98: assert(wakeup_flags == PDEMUX_READ);
99: connection_closed = posix_demuxer::socket_recv(s, &pb);
100: }
101: else
102: {
103:
104: assert(wakeup_flags == PDEMUX_WRITE);
105: connection_closed = posix_demuxer::socket_send(s, &pb);
106: }
107:
108:
109:
110:
111:
112:
113: if(connection_closed || pb.bytes_written == pb.buffer_size)
114: {
115:
116: request->notify_finished();
117: return;
118: }
119:
120:
121: if(demux.add_socket_wakeup(this, sio_flags) == -1)
122: fprintf(stderr,"failed to re-add_socket_wakeup\n");
123: }
124:
125:
126: bool
127: connect_request::start_async_op_impl()
128: {
129:
130:
131:
132: if(start(*pd) == -1) {
133: fprintf(stderr, "FAILED TO SPAWN CONNECT REQUEST\n");
134: return true;
135: }
136:
137:
138:
139:
140: return false;
141:
142: /*
143:
144: if(0 == socket_err) fprintf(stderr, "WOW, instant CONNECT\n");
145:
146:
147:
148: return 0 == socket_err;
149: */
150: }
151:
152: void
153: connect_request::wakeup(posix_demuxer& demux)
154: {
155:
156:
157:
158: connect_control_block::wakeup(demux);
159:
160:
161: notify_finished();
162: }
163:
164:
165:
166: bool
167: accept_request::start_async_op_impl()
168: {
169:
170: bool failed = (start(*pd) == -1);
171: if(failed)
172: fprintf(stderr, "FAILED TO SPAWN ACCEPT REQUEST\n");
173:
174:
175: return failed;
176: }
177:
178: void
179: accept_request::wakeup(posix_demuxer& demux)
180: {
181:
182: accept_control_block::wakeup(demux);
183:
184: if(accepted == -1)
185: {
186:
187: fprintf(stderr, "accept request failed (%i), retrying...\n",
188: socket_err);
189:
190: if(start(demux) == -1)
191: fprintf(stderr, "failed again... probably was a bad idea\n");
192: return;
193: }
194:
195: notify_finished();
196: }
197:
198:
199: flxfileio_request::~flxfileio_request(){}
200: flxfileio_request::flxfileio_request(){}
201:
202:
203: bool
204: flxfileio_request::start_async_op_impl()
205: {
206:
207:
208:
209:
210:
211: aio_worker->add_worker_task(this);
212:
213: return false;
214: }
215:
216: void
217: flxfileio_request::finished() { notify_finished(); }
218: }}
219:
1: #line 332 "./lpsrc/flx_posixio.ipk"
2:
3:
4:
5: include "pthread";
6: include "flx_faio";
7: include "flx_demux";
8:
9: module Faio_posix {
10: header faio_posixio_hpp = '#include "faio_posixio.hpp"';
11: requires package "demux";
12: requires package "faio";
13: open C_hack;
14: open Faio;
15: open Pthread;
16: open Demux;
17:
18: header unistd_h = '#include <unistd.h>';
19: header fcntl_h = '#include <fcntl.h>';
20: header sys_stat_h = '#include <fcntl.h>';
21: header sys_socket_h = '#include <sys/socket.h>';
22: header sockety_h = '#include "demux_sockety.hpp"';
23: header = '#include "faio_posixio.hpp"';
24:
25:
26: type fd_t = "int";
27: fun invalid: fd_t -> bool="$1==-1";
28:
29: instance Str[fd_t] {
30: fun str: fd_t -> string = "flx::rtl::strutil::str<int>($1)" requires flx_strutil;
31: }
32:
33: instance Str[socket_t] {
34: fun str: socket_t -> string = "flx::rtl::strutil::str<int>($1)" requires flx_strutil;
35: }
36:
37: proc close: socket_t = 'close($1);' requires unistd_h;
38: proc close: fd_t = 'close($1);' requires unistd_h;
39:
40: proc shutdown: socket_t*int = 'shutdown($a);' requires sys_socket_h;
41:
42: fun bad_socket : socket_t -> bool = "$1 == -1";
43:
44: type posix_permissions = "mode_t" requires sys_stat_h;
45: const S_IRUSR : posix_permissions;
46: const S_IWUSR : posix_permissions;
47: const S_IXUSR : posix_permissions;
48: const S_IRGRP : posix_permissions;
49: const S_IWGRP : posix_permissions;
50: const S_IXGRP : posix_permissions;
51: const S_IROTH : posix_permissions;
52: const S_IWOTH : posix_permissions;
53: const S_IXOTH : posix_permissions;
54:
55:
56: /*
57: gen aio_ropen: string -> fd_t = 'open($1.c_str(), O_RDONLY | O_NONBLOCK)'
58: requires fcntl_h, sys_stat_h;
59: gen aio_wopen: string -> fd_t = ' open($1.c_str(), O_WRONLY | O_NONBLOCK | O_CREAT | O_TRUNC, S_IRUSR|S_IWUSR)'
60: requires fcntl_h, sys_stat_h;
61: gen aio_rwopen: string -> fd_t = ' open($1.c_str(), O_RDWR | O_NONBLOCK | O_CREAT | O_TRUNC, S_IRUSR|S_IWUSR)'
62: requires fcntl_h, sys_stat_h;
63: gen aio_creat: string * posix_permissions-> fd_t = ' open($1.c_str(), O_RDWR | O_NONBLOCK | O_CREAT | O_TRUNC, $2)'
64: requires fcntl_h, sys_stat_h;
65: */
66:
67:
68:
69: gen ropen: string -> fd_t = 'open($1.data(), O_RDONLY,0)' requires fcntl_h, sys_stat_h;
70: gen wopen: string -> fd_t = 'open($1.data(), O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR)' requires fcntl_h, sys_stat_h;
71: gen rwopen: string -> fd_t = 'open($1.data(), O_RDWR,0)' requires fcntl_h, sys_stat_h;
72: gen creat: string * posix_permissions -> fd_t = 'open($1.data(), O_WRONLY | O_CREAT | O_TRUNC, $2)' requires fcntl_h, sys_stat_h;
73:
74: fun access: string -> posix_permissions = "get_perm($1.data())"
75: requires body """
76: mode_t get_perm(char const *f)
77: {
78: struct stat b;
79: stat(f,&b);
80: return b.st_mode;
81: }
82: """
83: ;
84:
85: fun access: fd_t -> posix_permissions = "get_perm($1)"
86: requires body """
87: mode_t get_perm(int f)
88: {
89: struct stat b;
90: fstat(f,&b);
91: return b.st_mode;
92: }
93: """
94: ;
95:
96: type socket_t = "int";
97:
98: type socketio_request = "flx::faio::socketio_request";
99:
100: gen mk_socketio_request: demuxer * socket_t*address*int*bool -> socketio_request
101: = 'flx::faio::socketio_request($1, $2, (char*)$3, $4, $5)';
102:
103: fun get_pb: socketio_request -> sel_param_ptr = '&$1.sv.pb';
104:
105:
106: proc async_rw(fd: socket_t, len: &int, buf: address, eof: &bool, read_flag: bool)
107: {
108: var asyncb = mk_socketio_request(sys_demux,fd, buf, *len, read_flag);
109: faio_req$ &asyncb;
110: calc_eof(asyncb.pb, len, eof);
111: }
112:
113: proc async_read(fd: socket_t, len: &int, buf: address,
114: eof: &bool)
115: {
116: async_rw(fd, len, buf, eof, true);
117: }
118:
119: proc async_write(fd: socket_t, len: &int, buf: address, eof: &bool)
120: {
121: async_rw(fd, len, buf, eof, false);
122: }
123:
124: type flxfileio_request = "flx::faio::flxfileio_request";
125:
126:
127: type async_connect = 'flx::faio::connect_request';
128:
129: fun mk_async_connect: demuxer * charp*int-> async_connect = 'flx::faio::connect_request($a)';
130: fun get_socket: async_connect -> socket_t = '$1.s';
131: fun get_err: async_connect -> int = '$1.socket_err';
132:
133:
134: proc connect(s: &socket_t, addr: charp, port: int)
135: {
136: var ac = mk_async_connect(sys_demux,addr, port);
137: faio_req$ ∾
138: *s = ac.socket;
139: }
140:
141: type accept_request = "flx::faio::accept_request";
142:
143: fun mk_accept: demuxer * socket_t -> accept_request = 'flx::faio::accept_request($1,$2)';
144: fun get_socket: accept_request -> int = '$1.accepted';
145:
146:
147: proc cmk_listener: lvalue[socket_t]*lvalue[int]*int
148: = '$1 = flx::demux::create_async_listener(&$2, $3);' requires sockety_h;
149:
150: proc mk_listener(s: &socket_t, port:&int, backlog:int)
151: {
152: cmk_listener(*s,*port, backlog);
153: }
154:
155: proc accept(s: &socket_t, listener: socket_t)
156: {
157: var acc = mk_accept$ sys_demux,listener;
158: faio_req$ &acc;
159: *s = acc.socket;
160: }
161:
162:
163:
164:
165: fun mk_faio: job_queue * fd_t*address*int*int*bool -> flxfileio_request
166: = 'flx::faio::flxfileio_request($1,$2, (char*)$3, $4, $5, $6)';
167: fun get_pb: flxfileio_request -> sel_param_ptr = '&$1.pb';
168:
169: proc faio_rw(q:job_queue, fd: fd_t, len: &int, buf: address, eof: &bool, read_flag: bool)
170: {
171:
172: var faio = mk_faio(q, fd, buf, *len, 0, read_flag);
173: faio_req$ &faio;
174:
175: calc_eof(faio.pb, len, eof);
176: }
177:
178:
179: val qbound = 20;
180: val nthreads = 4;
181: val sys_job_queue = Pthread::mk_job_queue(qbound,nthreads);
182:
183: proc faio_read(fd: fd_t, len: &int, buf: address,
184: eof: &bool)
185: {
186: faio_rw(sys_job_queue, fd, len, buf, eof, true);
187: }
188:
189: proc faio_write(fd: fd_t, len: &int, buf: address, eof: &bool)
190: {
191: faio_rw(sys_job_queue, fd, len, buf, eof, false);
192: }
193:
194:
195:
196: }
197:
198: