6. Felix Async IO tests
Start felix section to test/faio_01.flx[1
/1
]
1: #line 2415 "./lpsrc/flx_faio.pak"
2:
3:
4:
5: include "flx_socket";
6: open Flx_socket;
7:
8: print "flx tcp stream test\n";
9:
10:
11: var listener: flx_listener;
12: var port = 0;
13:
14:
15:
16: mk_flx_listener(&listener, &port, 1);
17:
18:
19:
20: print "spawning connector\n";
21:
22:
23:
24: spawn_fthread
25: {
26: {
27:
28: var c: flx_socket;
29: flx_connect(&c, c"127.0.0.1", port);
30: var str: string;
31:
32: get_line(c, &str);
33: print "connector got "; print str; endl;
34: write_string(c, "thanks\n");
35:
36: flx_close(c);
37: };
38: };
39:
40: var s: flx_socket;
41: flx_accept(listener, &s);
42: flx_close(listener);
43:
44: print "got connection\n";
45: write_string(s, "server says hi\n");
46:
47: var str: string;
48: get_line(s, &str);
49:
50: print "server got "; print str; endl;
51: flx_close(s);
52:
Start felix section to test/posix_t1.flx[1
/1
]
1: #line 2468 "./lpsrc/flx_faio.pak"
2:
3: include "flx_faio_posix";
4: open Faio_posix;
5:
6:
7:
8:
9: print "felix posix accept/connect test\n";
10:
11: var port = 0;
12: print "creating listener\n";
13: var listener: int <- mk_listener(port, 1);
14:
15: print "spawning connector\n";
16:
17:
18: spawn_fthread{ { var c: int; connect(&c, c"127.0.0.1", port); }; };
19:
20: var s: int;
21: accept (&s, listener);
22: if s == -1 then {
23: System::exit 1;
24: } else {
25: print "accepted connection\n";
26: System::exit 0;
27: } endif;
28:
Start felix section to test/posix_t2.flx[1
/1
]
1: #line 2497 "./lpsrc/flx_faio.pak"
2:
3: include "flx_faio_posix";
4: open Faio_posix;
5:
6: header "typedef struct { char dat[8]; } tstbuf;";
7: ctypes tstbuf;
8: proc dprint: tstbuf = 'printf("%.8s", $1.dat);';
9: fun get_data: tstbuf -> address = "$1.dat";
10: fun get_data: charp -> address = "$1";
11:
12:
13: var port = 0;
14: var listener: int <- mk_listener(port, 1);
15:
16:
17:
18: spawn_fthread{
19: {
20: var c: int;
21: connect(&c, c"127.0.0.1", port);
22:
23: var len = 8;
24: var eof: bool;
25: async_write(c, &len, get_data((c"faio2you")), &eof);
26: shutdown(c, 1);
27:
28: var b: tstbuf;
29: async_read(c, &len, b.data, &eof);
30: print "connector read "; dprint b; endl;
31: System::exit 0;
32: };
33: };
34:
35: var s: int;
36: accept (&s, listener);
37: var b: tstbuf;
38: var len = 16;
39: var eof: bool;
40: async_read(s, &len, b.data, &eof);
41: print "acceptor read "; print len; print " bytes: "; dprint b; endl;
42: async_write(s, &len, get_data((c"thanks!!")), &eof);
43:
Start felix section to test/posix_t3.flx[1
/1
]
1: #line 2541 "./lpsrc/flx_faio.pak"
2:
3:
4:
5: include "flx_stream";
6: print "more output here\n";
7:
Start felix section to test/win_t1.flx[1
/1
]
1: #line 2549 "./lpsrc/flx_faio.pak"
2:
3: include "flx_faio_win32";
4: open Faio_win32;
5:
6: var port = 1234;
7: var listener: SOCKET;
8: mk_listener(&listener, &port, 1);
9:
10: print "spawning connector\n";
11:
12:
13: spawn_fthread{
14: {
15: var c: SOCKET;
16: Connect(&c, c"127.0.0.1", port);
17: };
18: };
19:
20: var s: SOCKET;
21: var success: bool;
22: mk_socket(&s);
23: Accept(&success, listener, s);
24:
25: if success then {
26: print "successful accept!\n";
27: System::exit 0;
28: } else {
29: print "accept failed!\n";
30: System::exit 1;
31: } endif;
32:
33:
Start felix section to test/win_t2.flx[1
/1
]
1: #line 2583 "./lpsrc/flx_faio.pak"
2:
3: include "Flx_faio_win32";
4: open Faio_win32;
5:
6:
7:
8:
9: var pname = r"\\.\pipe\flx_pipe";
10:
11:
12:
13:
14:
15:
16:
17:
18: proc CreateNamedPipe: lvalue[WFILE]*string =
19: '$1 = CreateNamedPipe($2.c_str(), PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED,\
20: PIPE_TYPE_BYTE, 1, 256, 256, 0, NULL);';
21:
22:
23:
24: print "Creating named pipe "; print pname; endl;
25: var pipe: WFILE <- CreateNamedPipe(pname);
26:
27:
28: if pipe == INVALID_HANDLE_VALUE then
29: {
30: print "BUGGER: CreateNamedPipeFailed: "; print (GetLastError()); endl;
31: } else {
32: print "whoo!\n";
33: } endif;
34:
35:
36:
37:
38:
39: fun HACK_TO_SOCKET: WFILE -> SOCKET = '(SOCKET)$1';
40: associate_with_iocp(HACK_TO_SOCKET(pipe));
41:
42: header = """
43:
44: using namespace flx;
45: using namespace demux;
46: using namespace faio;
47:
48: // wrap up ConnectNamedPipe. This function normally blocks, so we must
49: // use it in a way that allows us to deschedule the fthread.
50: class connect_namedpipe
51: : public waio_base, public flx::demux::iocp_wakeup {
52: HANDLE pipe; // for demux
53: public:
54: connect_namedpipe(HANDLE p = INVALID_HANDLE_VALUE) : pipe(p) {}
55:
56: // this belongs in demux class
57: virtual bool start_overlapped()
58: {
59: fprintf(stderr, "connect named pipe start overlapped %p\\n", pipe);
60:
61: clear_overlapped(); // everyone's doing this
62:
63: BOOL success;
64: success = ConnectNamedPipe(pipe, &ol);
65:
66: // fprintf(stderr, "Connect named pipe: %i\\n", success);
67: if(success)
68: {
69: // this shouldn't happen ever. make it an error.
70: fprintf(stderr, "ConnectNamedPipe SUCCEEDED (shouldn't happen)\\n");
71: } else {
72: int err = GetLastError();
73: fprintf(stderr, "ConnectNamedPipe returned %i\\n", err);
74:
75: // this doesn't always signify failure.
76: switch(err)
77: {
78: case ERROR_PIPE_CONNECTED: // got a connection already
79: fprintf(stderr, "ALREADY GOT CONNECTION\\n");
80: // do fake wakeup here for great greatness. eats user cookie.
81: iocp_op_finished(0, 0, &ol, NO_ERROR);
82: return true; // async finished
83: break;
84:
85: case ERROR_IO_PENDING: // completion packet is in the mail
86: fprintf(stderr, "Connection pending... (normal)\\n");
87: return false; // not finished, packet still to come
88: break;
89: default:
90: fprintf(stderr, "ConnectNamedPipe FAILED (%i)\\n", err);
91: break;
92: }
93:
94: }
95:
96: return false; // let's not get woken, packet still to come (\?)
97: }
98:
99: // this belongs in faio class
100: virtual bool start_async_op(demux::demuxer& demux, flx_drv* drv, void* f)
101: {
102: fprintf(stderr, "start_async_op for named pipe\\n");
103: RECORD_THREAD_INFO(fw); // records enough info for wakeup
104: return start_overlapped();
105: }
106:
107: // as does this
108: virtual void iocp_op_finished(DWORD nbytes, ULONG_PTR udat,
109: LPOVERLAPPED olp, int err)
110: {
111: fprintf(stderr, "connect named pipe FINISHED! err: %i\\n", err);
112: waio_base::iocp_op_finished(nbytes, udat, olp, err);
113: }
114:
115:
116: };
117: """;
118:
119: proc getbyte(f: WFILE, outc: &char)
120: {
121: var c: char;
122: val ac = C_hack::address_of(c);
123: var len = 1;
124: var eof: bool;
125:
126: ReadFile(f, &len, ac, &eof);
127: *outc = c;
128: }
129:
130: proc putbyte(f: WFILE, c: char)
131: {
132: var copyc = c;
133: val ac = C_hack::address_of(copyc);
134: var len = 1;
135: var eof: bool;
136:
137: WriteFile(f, &len, ac, &eof);
138: }
139:
140:
141:
142:
143: spawn_fthread {
144:
145:
146:
147: var p: WFILE <- OpenFileDuplex(pname);
148:
149:
150:
151: if p == INVALID_HANDLE_VALUE then
152: { print "BUGGER: client couldn't open pipe: "; print (GetLastError()); endl; }
153: else { print "client opened pipe\n"; } endif;
154:
155:
156: print "associating client\n";
157: associate_with_iocp(HACK_TO_SOCKET(p));
158:
159: var c: char;
160: getbyte(p, &c);
161: print "client got "; print c; endl;
162: putbyte(p, char 'b');
163: CloseFile(p);
164: };
165:
166: type np_request = 'connect_namedpipe';
167: fun mk_np_request: WFILE -> np_request = 'connect_namedpipe($1)';
168:
169: var npr = mk_np_request(pipe);
170: Faio::faio_req$ ⊀
171:
172: print "poot! got connection (maybe)\n";
173: print "server trying to put byte\n";
174: putbyte(pipe, char 'a');
175: var c: char;
176: getbyte(pipe, &c);
177: print "server got "; print c; endl;
178: CloseFile(pipe);
179:
Start data section to test/faio_01.expect[1
/1
]
1: flx tcp stream test
2: spawning connector
3: got connection
4: connector got server says hi
5: server got thanks
Start data section to test/posix_t1.expect[1
/1
]
1: felix posix accept/connect test
2: creating listener
3: spawning connector
4: accepted connection
Start data section to test/posix_t2.expect[1
/1
]
1: acceptor read 8 bytes: faio2you
2: connector read thanks!!
Start data section to test/posix_t3.expect[1
/1
]
Start data section to test/win_t1.expect[1
/1
]
1: spawning connector
2: successful accept!
Start data section to test/win_t2.expect[1
/1
]
1: Creating named pipe \\.\pipe\flx_pipe
2: whoo!
3: client opened pipe
4: associating client
5: poot! got connection (maybe)
6: server trying to put byte
7: client got a
8: server got b
Start felix section to test/demo_webserver.flx[1
/1
]
1: #line 2799 "./lpsrc/flx_faio.pak"
2:
3:
4:
5:
6:
7:
8:
9:
10:
11: include "flx_socket";
12: open Flx_socket;
13:
14: header = """
15: string
16: getline_to_url(const string& get)
17: {
18: // chomp off "GET " (should check it)
19: if(get.length() < 4) return "";
20:
21: unsigned int pos = get.substr(4).find(' ');
22:
23: if(pos == string::npos) return "";
24:
25: return get.substr(4, pos);
26: }
27:
28: // split url into base and file name http://foo.com/file.html
29: // -> http://foo.com + file.html. failure returns nothing.
30: bool
31: split_url(const string& inurl, string& base, string& file)
32: {
33: // munch leading http:// if present
34: string url;
35: if(inurl.length() >= 7 && inurl.substr(0, 7) == "http://")
36: url = inurl.substr(7);
37: else
38: url = inurl;
39:
40: unsigned int pos = url.find('/');
41:
42: if(string::npos == pos) return false; // all bad
43:
44: base = url.substr(0, pos);
45: file = url.substr(pos+1);
46: return true; // all good
47: }
48:
49: bool
50: split_getline(const string& get, string& base, string& file)
51: {
52: return split_url(getline_to_url(get), base, file);
53: }
54: """;
55:
56: proc parse_get_line: string*lvalue[bool]*lvalue[string]*lvalue[string]
57: = '$2 = split_getline($1, $3, $4);';
58:
59: fun substr: string*int -> string = "$1.substr($2)";
60:
61:
62:
63:
64:
65:
66: val html_header = """
67: HTTP/1.1 200 OK
68: Date: Tue, 25 Apr 2006 00:16:12 GMT
69: Server: felix web server
70: Last-Modified: Wed, 01 Feb 2006 18:51:37 GMT
71: ETag: "6d8029-25c-43e10339"
72: Accept-Ranges: bytes
73: Connection: close
74: Content-Type: text/html
75:
76: """;
77:
78:
79: val gif_header = """
80: HTTP/1.1 200 OK\r
81: Date: Sun, 30 Apr 2006 07:14:50 GMT\r
82: Server: felix web server\r
83: Last-Modified: Sun, 28 Nov 2004 18:59:31 GMT\r
84: ETag: "7f004d-586-41aa2013"\r
85: Accept-Ranges: bytes\r
86: Connection: close\r
87: Content-Type: image/gif\r
88:
89: """;
90:
91: proc substitute(s: string, a: char, b: char, res: &string)
92: {
93: var s2: string;
94: var slen = len s;
95: var i: int;
96:
97: for_each{i=0;}{i<slen}{i++;}
98: {
99: if s.[i] == a then
100: { s2 += b; } else
101: { s2 += s.[i]; } endif;
102:
103: };
104:
105: *res = s2;
106: }
107:
108: proc serve_file(infname: string, s: flx_socket)
109: {
110: var fname: string;
111:
112:
113:
114:
115: if "" == infname then { fname = "index.html"; }else{ fname = infname;}endif;
116:
117:
118:
119: print "serve file: "; print fname; endl;
120:
121:
122:
123:
124:
125: var suffix: string;
126: var dotpos = stl_rfind(fname, char ".");
127:
128: if stl_npos != dotpos then { suffix = substr(fname, dotpos+1); }
129: else {} endif;
130:
131: print "suffix is "; print suffix; endl;
132:
133: #if WIN32
134: var wname: string;
135:
136:
137: substitute(fname, char '/', char '\\', &wname);
138: print "mapped "; print fname; print " -> "; print wname; endl;
139:
140:
141: var wf: WFILE <- OpenFile(wname);
142:
143: if wf == INVALID_HANDLE_VALUE then
144: {
145: print "BUGGER: OpenFile failed: "; print (GetLastError()); endl;
146: } else {
147: print "opened "; print wname; endl;
148:
149:
150: if("gif" == suffix) then { write_string(s, gif_header); }
151: else { write_string(s, html_header); } endif;
152:
153: print "Transmitting file!\n";
154: TransmitFile(s, wf);
155:
156:
157: CloseFile(wf);
158: } endif;
159: #elif POSIX
160:
161:
162:
163: var fd = aio_ropen(fname);
164:
165: if fd == -1 then
166: {
167: print "BUGGER, posix open failed\n";
168: } else {
169: print "got fd="; print fd; endl;
170:
171:
172:
173: if("gif" == suffix) then { write_string(s, gif_header); }
174: else { write_string(s, html_header); } endif;
175:
176: var from_strm: flx_stream = UFD fd;
177: var to_strm: flx_stream = USOCK s;
178: cat(from_strm, to_strm);
179:
180: flx_close(from_strm);
181: } endif;
182:
183:
184:
185:
186:
187:
188: #endif
189: }
190:
191: val webby_port = 1234;
192:
193: print "FLX WEB!!! listening on port "; print webby_port; endl;
194:
195:
196: var p = webby_port;
197: var listener: flx_listener;
198: mk_flx_listener(&listener, &p, 10);
199:
200:
201: forever {
202: var s: flx_socket;
203: flx_accept(listener, &s);
204: print "got connection (or something)\n";
205:
206:
207: print "spawning fthread to handle connection\n";
208: spawn_fthread {
209:
210:
211: var line: string;
212: get_line(s, &line);
213:
214: val poo =
215: if "GET " == line.[0 to 4] then line.[4 to ] else "" endif;
216: print ("poo="poo); endl;
217:
218:
219: print "got line: "; print line; endl;
220:
221:
222:
223: var succ: bool;
224: var base: string;
225: var file: string;
226:
227: parse_get_line(line, succ, base, file);
228:
229:
230: if succ then {
231: print "well formed get...\n";
232: print "base="; print base; endl;
233: print "file="; print file; endl;
234:
235: serve_file(file, s);
236: } else {
237: print "BAD get line: "; print line; endl;
238: } endif;
239:
240: flx_close(s);
241:
242: };
243:
244: };
245:
246: flx_close(listener);
247:
248: