6. Felix Async IO tests

Start felix section to test/faio_01.flx[1 /1 ]
     1: #line 2415 "./lpsrc/flx_faio.pak"
     2: // portable flx sockets test (rt = regression test)
     3: 
     4: #import <flx.flxh>
     5: include "flx_socket";
     6: open Flx_socket;
     7: 
     8: print "flx tcp stream test\n";
     9: // System::exit 0;  // Work in progress
    10: 
    11: var listener: flx_listener;
    12: var port = 0;  // you choose
    13: // var port = 1024;
    14: 
    15: // check errors. how is that done?
    16: mk_flx_listener(&listener, &port, 1);
    17: // print "Got port: "; print port; endl;
    18: // print "accepting\n";
    19: 
    20: print "spawning connector\n";
    21: // not printing in thread to make output deterministic.
    22: // note that the connect shouldn't fail (immediately) because the
    23: // listener exists - it just hasn't called accept yet.
    24: spawn_fthread
    25: {
    26:   {
    27:     // print "Connector dude\n";  // get rid of, hard to test
    28:     var c: flx_socket;
    29:     flx_connect(&c, c"127.0.0.1", port); // connect to localhost
    30:     var str: string;
    31: 
    32:     get_line(c, &str);
    33:     print "connector got "; print str; endl;
    34:     write_string(c, "thanks\n");         // newline important
    35: 
    36:     flx_close(c);  // finished with this
    37:   };
    38: };
    39: 
    40: var s: flx_socket;
    41: flx_accept(listener, &s);
    42: flx_close(listener);  // not needed anymore
    43: 
    44: print "got connection\n";
    45: write_string(s, "server says hi\n");     // newline important here
    46: 
    47: var str: string;
    48: get_line(s, &str);
    49: 
    50: print "server got "; print str; endl;
    51: flx_close(s);
    52: 
End felix section to test/faio_01.flx[1]
Start felix section to test/posix_t1.flx[1 /1 ]
     1: #line 2468 "./lpsrc/flx_faio.pak"
     2: #import <flx.flxh>
     3: include "flx_faio_posix";
     4: open Faio_posix;
     5: 
     6: // create a listening socket, spawn a thread to connect to it.
     7: // in case something goes wrong could make test time out with
     8: // spawn_fthread { { sleep 5.0; System::exit 1; }; };
     9: print "felix posix accept/connect test\n";
    10: 
    11: var port = 0;   // let mk_listener choose the port
    12: print "creating listener\n";
    13: var listener: int <- mk_listener(port, 1);
    14: 
    15: print "spawning connector\n";
    16: // not printing in thread to make output repeatable in
    17: // the face of scheduler changes.
    18: spawn_fthread{ { var c: int; connect(&c, c"127.0.0.1", port); }; };
    19: 
    20: var s: int;
    21: accept (&s, listener);  // async!
    22: if s == -1 then {
    23:   System::exit 1;
    24: } else {
    25:   print "accepted connection\n";
    26:   System::exit 0;
    27: } endif;
    28: 
End felix section to test/posix_t1.flx[1]
Start felix section to test/posix_t2.flx[1 /1 ]
     1: #line 2497 "./lpsrc/flx_faio.pak"
     2: #import <flx.flxh>
     3: include "flx_faio_posix";
     4: open Faio_posix;
     5: 
     6: header "typedef struct { char dat[8]; } tstbuf;";
     7: ctypes tstbuf;
     8: proc dprint: tstbuf = 'printf("%.8s", $1.dat);';
     9: fun get_data: tstbuf -> address = "$1.dat";
    10: fun get_data: charp -> address = "$1";
    11: 
    12: // try to send some data down a socket
    13: var port = 0;   // let mk_listener choose the port
    14: var listener: int <- mk_listener(port, 1);
    15: 
    16: // not printing in thread to make output repeatable in
    17: // the face of scheduler changes.
    18: spawn_fthread{
    19:   {
    20:     var c: int;
    21:     connect(&c, c"127.0.0.1", port);
    22: 
    23:     var len = 8;
    24:     var eof: bool;
    25:     async_write(c, &len, get_data((c"faio2you")), &eof);
    26:     shutdown(c, 1);  // no further writes (wakes reader)
    27: 
    28:     var b: tstbuf;
    29:     async_read(c, &len, b.data, &eof);
    30:     print "connector read "; dprint b; endl;
    31:     System::exit 0;
    32:   };
    33: };
    34: 
    35: var s: int;
    36: accept (&s, listener);  // async!
    37: var b: tstbuf;
    38: var len = 16;           // ask for more than there is and rely on shutdown
    39: var eof: bool;
    40: async_read(s, &len, b.data, &eof);
    41: print "acceptor read "; print len; print " bytes: "; dprint b; endl;
    42: async_write(s, &len, get_data((c"thanks!!")), &eof);
    43: 
End felix section to test/posix_t2.flx[1]
Start felix section to test/posix_t3.flx[1 /1 ]
     1: #line 2541 "./lpsrc/flx_faio.pak"
     2: #include <flx.flxh>
     3: // actually portable, but I don't know the scheme for
     4: // adding those tests.
     5: include "flx_stream";
     6: print "more output here\n";
     7: 
End felix section to test/posix_t3.flx[1]
Start felix section to test/win_t1.flx[1 /1 ]
     1: #line 2549 "./lpsrc/flx_faio.pak"
     2: #import <flx.flxh>
     3: include "flx_faio_win32";
     4: open Faio_win32;
     5: 
     6: var port = 1234;  // can't yet get os to choose the port. should fix that.
     7: var listener: SOCKET;
     8: mk_listener(&listener, &port, 1);
     9: 
    10: print "spawning connector\n";
    11: // not printing in thread to make output repeatable in
    12: // the face of scheduler changes.
    13: spawn_fthread{
    14:   {
    15:     var c: SOCKET;
    16:     Connect(&c, c"127.0.0.1", port);
    17:   };
    18: };
    19: 
    20: var s: SOCKET;
    21: var success: bool;
    22: mk_socket(&s);    // for async accept on win32 you create the accept socket yourself
    23: Accept(&success, listener, s);
    24: 
    25: if success then {
    26:   print "successful accept!\n";
    27:   System::exit 0;
    28: } else {
    29:   print "accept failed!\n";
    30:   System::exit 1;
    31: } endif;
    32: 
    33: 
End felix section to test/win_t1.flx[1]
Start felix section to test/win_t2.flx[1 /1 ]
     1: #line 2583 "./lpsrc/flx_faio.pak"
     2: #import <flx.flxh>
     3: include "Flx_faio_win32";
     4: open Faio_win32;
     5: 
     6: // let's add a win32 namedpipe test!
     7: // type WFILE is a HANDLE. that should work.
     8: // note the "r" for felix string raw mode.
     9: var pname = r"\\.\pipe\flx_pipe";
    10: 
    11: // CreateNamedPipe binding (put in a library somewhere)
    12: // duplex, byte stream, one instance,
    13: // guessing 256 bytes for input and output buffers
    14: // 0 default timeout (not using default wait, shouldn't matter)
    15: // default security attributes
    16: // associating with the iocp doesn't work, the pipe must be created with
    17: // FILE_FLAG_OVERLAPPED set in dwOpenMode. Otherwise OVERLAPPED calls block.
    18: proc CreateNamedPipe: lvalue[WFILE]*string =
    19:  '$1 = CreateNamedPipe($2.c_str(), PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED,\
    20:      PIPE_TYPE_BYTE, 1, 256, 256, 0, NULL);';
    21: 
    22: // OpenFile in flx_faio_win32 should work fine here
    23: 
    24: print "Creating named pipe "; print pname; endl;
    25: var pipe: WFILE <- CreateNamedPipe(pname);
    26: 
    27: // how to connect with this stuff? can I open it non blockingly?
    28: if pipe == INVALID_HANDLE_VALUE then
    29: {
    30:   print "BUGGER: CreateNamedPipeFailed: "; print (GetLastError()); endl;
    31: } else {
    32:   print "whoo!\n";
    33: } endif;
    34: 
    35: // let's try to associate with IOCP here...
    36: // Hack: the interface requires a SOCKET, which it then casts to a HANDLE.
    37: // don't forget to create pipe with FILE_FLAG_OVERLAPPED, else we
    38: // error #87 -> ERROR_INVALID_PARAMETER
    39: fun HACK_TO_SOCKET: WFILE -> SOCKET = '(SOCKET)$1';
    40: associate_with_iocp(HACK_TO_SOCKET(pipe));
    41: 
    42: header = """
    43: 
    44: using namespace flx;
    45: using namespace demux;
    46: using namespace faio;
    47: 
    48: // wrap up ConnectNamedPipe. This function normally blocks, so we must
    49: // use it in a way that allows us to deschedule the fthread.
    50: class connect_namedpipe
    51:   : public waio_base,  public flx::demux::iocp_wakeup {
    52:   HANDLE pipe;       // for demux
    53: public:
    54:   connect_namedpipe(HANDLE p = INVALID_HANDLE_VALUE) : pipe(p) {}
    55: 
    56:   // this belongs in demux class
    57:   virtual bool start_overlapped()
    58:   {
    59:     fprintf(stderr, "connect named pipe start overlapped %p\\n", pipe);
    60: 
    61:     clear_overlapped();  // everyone's doing this
    62: 
    63:     BOOL success;
    64:     success = ConnectNamedPipe(pipe, &ol);
    65: 
    66:     // fprintf(stderr, "Connect named pipe: %i\\n", success);
    67:     if(success)
    68:     {
    69:       // this shouldn't happen ever. make it an error.
    70:       fprintf(stderr, "ConnectNamedPipe SUCCEEDED (shouldn't happen)\\n");
    71:     } else {
    72:       int err = GetLastError();
    73:       fprintf(stderr, "ConnectNamedPipe returned %i\\n", err);
    74: 
    75:       // this doesn't always signify failure.
    76:       switch(err)
    77:       {
    78:          case ERROR_PIPE_CONNECTED: // got a connection already
    79:            fprintf(stderr, "ALREADY GOT CONNECTION\\n");
    80:            // do fake wakeup here for great greatness. eats user cookie.
    81:            iocp_op_finished(0, 0, &ol, NO_ERROR);
    82:            return true;  // async finished
    83:          break;
    84: 
    85:          case ERROR_IO_PENDING:  // completion packet is in the mail
    86:            fprintf(stderr, "Connection pending... (normal)\\n");
    87:            return false;         // not finished, packet still to come
    88:          break;
    89:          default:
    90:            fprintf(stderr, "ConnectNamedPipe FAILED (%i)\\n", err);
    91:          break;
    92:       }
    93: 
    94:     }
    95: 
    96:     return false;  // let's not get woken, packet still to come (\?)
    97:   }
    98: 
    99:   // this belongs in faio class
   100:   virtual bool start_async_op(demux::demuxer& demux, flx_drv* drv, void* f)
   101:   {
   102:     fprintf(stderr, "start_async_op for named pipe\\n");
   103:     RECORD_THREAD_INFO(fw);   // records enough info for wakeup
   104:     return start_overlapped();
   105:   }
   106: 
   107:   // as does this
   108:   virtual void iocp_op_finished(DWORD nbytes, ULONG_PTR udat,
   109:     LPOVERLAPPED olp, int err)
   110:   {
   111:     fprintf(stderr, "connect named pipe FINISHED! err: %i\\n", err);
   112:     waio_base::iocp_op_finished(nbytes, udat, olp, err);
   113:   }
   114: 
   115: 
   116: };
   117: """;
   118: 
   119: proc getbyte(f: WFILE, outc: &char)
   120: {
   121:   var c: char;
   122:   val ac = C_hack::address_of(c);
   123:   var len = 1;
   124:   var eof: bool;
   125: 
   126:   ReadFile(f, &len, ac, &eof);
   127:   *outc = c;
   128: }
   129: 
   130: proc putbyte(f: WFILE, c: char)
   131: {
   132:   var copyc = c;
   133:   val ac = C_hack::address_of(copyc);
   134:   var len = 1;
   135:   var eof: bool;
   136: 
   137:   WriteFile(f, &len, ac, &eof);
   138: }
   139: 
   140: // spawn a connect fthread after having created the named pipe
   141: 
   142: // I'm not yet "accepting", but hopefully I can spawn a client here
   143: spawn_fthread {
   144:   // print "Gudday, client thread, trying to open PIPE\n";
   145:   // var p: WFILE <- OpenFile(pname);
   146:   // having trouble getting io...
   147:   var p: WFILE <- OpenFileDuplex(pname);
   148: 
   149:   // print "Client thread returned from OpenFile\n";
   150: 
   151:   if p == INVALID_HANDLE_VALUE then
   152:   { print "BUGGER: client couldn't open pipe: "; print (GetLastError()); endl; }
   153:   else { print "client opened pipe\n"; } endif;
   154: 
   155:   // HUM need to associate.
   156:   print "associating client\n";
   157:   associate_with_iocp(HACK_TO_SOCKET(p));
   158: 
   159:   var c: char;
   160:   getbyte(p, &c);
   161:   print "client got "; print c; endl;
   162:   putbyte(p, char 'b');
   163:   CloseFile(p);
   164: };
   165: 
   166: type np_request = 'connect_namedpipe';
   167: fun mk_np_request: WFILE -> np_request = 'connect_namedpipe($1)';
   168: 
   169: var npr = mk_np_request(pipe);
   170: Faio::faio_req$ &npr;
   171: 
   172: print "poot! got connection (maybe)\n";
   173: print "server trying to put byte\n";
   174: putbyte(pipe, char 'a');
   175: var c: char;
   176: getbyte(pipe, &c);
   177: print "server got "; print c; endl;
   178: CloseFile(pipe);
   179: 
End felix section to test/win_t2.flx[1]
Start data section to test/faio_01.expect[1 /1 ]
     1: flx tcp stream test
     2: spawning connector
     3: got connection
     4: connector got server says hi
     5: server got thanks
End data section to test/faio_01.expect[1]
Start data section to test/posix_t1.expect[1 /1 ]
     1: felix posix accept/connect test
     2: creating listener
     3: spawning connector
     4: accepted connection
End data section to test/posix_t1.expect[1]
Start data section to test/posix_t2.expect[1 /1 ]
     1: acceptor read 8 bytes: faio2you
     2: connector read thanks!!
End data section to test/posix_t2.expect[1]
Start data section to test/posix_t3.expect[1 /1 ]
     1: more output here
End data section to test/posix_t3.expect[1]
Start data section to test/win_t1.expect[1 /1 ]
     1: spawning connector
     2: successful accept!
End data section to test/win_t1.expect[1]
Start data section to test/win_t2.expect[1 /1 ]
     1: Creating named pipe \\.\pipe\flx_pipe
     2: whoo!
     3: client opened pipe
     4: associating client
     5: poot! got connection (maybe)
     6: server trying to put byte
     7: client got a
     8: server got b
End data section to test/win_t2.expect[1]
Start felix section to test/demo_webserver.flx[1 /1 ]
     1: #line 2799 "./lpsrc/flx_faio.pak"
     2: #import <flx.flxh>
     3: #import <flx_platform.flxh>
     4: 
     5: #if POSIX
     6: //include "flx_faio_posix";  // aio_ropen
     7: //open Faio_posix;
     8: #endif
     9: 
    10: 
    11: include "flx_socket";
    12: open Flx_socket;
    13: 
    14: header = """
    15: string
    16: getline_to_url(const string& get)
    17: {
    18:     // chomp off "GET " (should check it)
    19:     if(get.length() < 4) return "";
    20: 
    21:     unsigned int pos = get.substr(4).find(' ');
    22: 
    23:     if(pos == string::npos) return "";
    24: 
    25:     return get.substr(4, pos);
    26: }
    27: 
    28: // split url into base and file name http://foo.com/file.html
    29: // -> http://foo.com + file.html. failure returns nothing.
    30: bool
    31: split_url(const string& inurl, string& base, string& file)
    32: {
    33:     // munch leading http:// if present
    34:     string url;
    35:     if(inurl.length() >= 7 && inurl.substr(0, 7) == "http://")
    36:       url = inurl.substr(7);
    37:     else
    38:       url = inurl;
    39: 
    40:     unsigned int pos = url.find('/');
    41: 
    42:     if(string::npos == pos)  return false;       // all bad
    43: 
    44:     base = url.substr(0, pos);
    45:     file = url.substr(pos+1);
    46:     return true;            // all good
    47: }
    48: 
    49: bool
    50: split_getline(const string& get, string& base, string& file)
    51: {
    52:     return split_url(getline_to_url(get), base, file);
    53: }
    54: """;
    55: 
    56: proc parse_get_line: string*lvalue[bool]*lvalue[string]*lvalue[string]
    57:  = '$2 = split_getline($1, $3, $4);';
    58: 
    59: fun substr: string*int -> string = "$1.substr($2)";
    60: 
    61: // TODO: fill in that length field, stream back the requested jpeg,
    62: // get port from argv.
    63: // took out the Content-Length: 604. line. wasn't right anyway. still
    64: // works with firefox that goes up to EOF.
    65: // what's ETag?
    66: val html_header = """
    67: HTTP/1.1 200 OK
    68: Date: Tue, 25 Apr 2006 00:16:12 GMT
    69: Server: felix web server
    70: Last-Modified: Wed, 01 Feb 2006 18:51:37 GMT
    71: ETag: "6d8029-25c-43e10339"
    72: Accept-Ranges: bytes
    73: Connection: close
    74: Content-Type: text/html
    75: 
    76: """;
    77: 
    78: // deleted Content-Length: 1414
    79: val gif_header = """
    80: HTTP/1.1 200 OK\r
    81: Date: Sun, 30 Apr 2006 07:14:50 GMT\r
    82: Server: felix web server\r
    83: Last-Modified: Sun, 28 Nov 2004 18:59:31 GMT\r
    84: ETag: "7f004d-586-41aa2013"\r
    85: Accept-Ranges: bytes\r
    86: Connection: close\r
    87: Content-Type: image/gif\r
    88: 
    89: """;
    90: 
    91: proc substitute(s: string, a: char, b: char, res: &string)
    92: {
    93:   var s2: string;
    94:   var slen = len s;
    95:   var i: int;
    96: 
    97:   for_each{i=0;}{i<slen}{i++;}
    98:   {
    99:      if s.[i] == a then
   100:      { s2 += b; } else
   101:      { s2 += s.[i]; } endif;
   102: 
   103:   };
   104: 
   105:   *res = s2;
   106: }
   107: 
   108: proc serve_file(infname: string, s: flx_socket)
   109: {
   110:   var fname: string;
   111: 
   112:   // if empty string, serve index.html
   113:   // not quite right - needs to handle directories too, so
   114:   // not only foo.com/ -> index.html, but foo.com/images/ -> images/index.html
   115:   if "" == infname then { fname = "index.html"; }else{ fname = infname;}endif;
   116: 
   117:   // set mime type depending on extension...
   118:   // serve a "not found page" for that case (check for recursion)
   119:   print "serve file: "; print fname; endl;
   120: 
   121:   // this isn't right, don't want the contents parsed as text, want them
   122:   // sent faithfully over the wire. of course doesn't work for jpegs and other
   123:   // binary formats.
   124: 
   125:   var suffix: string;
   126:   var dotpos = stl_rfind(fname, char ".");
   127:   // print "dotpos = "; print dotpos; endl;
   128:   if stl_npos != dotpos then { suffix = substr(fname, dotpos+1); }
   129:   else {} endif;
   130: 
   131:   print "suffix is "; print suffix; endl;
   132: 
   133: #if WIN32
   134:   var wname: string;
   135: 
   136:   // quick 'n' dirty unix -> dos style pathnames
   137:   substitute(fname, char '/', char '\\', &wname);
   138:   print "mapped "; print fname; print " -> "; print wname; endl;
   139:   // send header
   140:   // TransmitFile
   141:   var wf: WFILE <- OpenFile(wname);
   142: 
   143:   if wf == INVALID_HANDLE_VALUE then
   144:   {
   145:     print "BUGGER: OpenFile failed: "; print (GetLastError()); endl;
   146:   } else {
   147:     print "opened "; print wname; endl;
   148: 
   149:     // mime type mapping from suffix. make better here.
   150:     if("gif" == suffix) then { write_string(s, gif_header); }
   151:     else { write_string(s, html_header); } endif;
   152: 
   153:     print "Transmitting file!\n";
   154:     TransmitFile(s, wf);
   155: 
   156:     // send footer
   157:     CloseFile(wf);
   158:   } endif;
   159: #elif POSIX
   160:   // this fn sets the O_NONBLOCK flag which is completely unnecessary
   161:   // as flx_read goes via the preading worker fifo. don't know if
   162:   // O_NONBLOCK even works on actual files.
   163:   var fd = aio_ropen(fname);
   164: 
   165:   if fd == -1 then
   166:   {
   167:     print "BUGGER, posix open failed\n";
   168:   } else {
   169:     print "got fd="; print fd; endl;
   170: 
   171:     // mime type mapping from suffix. make better here.
   172:     // factor out
   173:     if("gif" == suffix) then { write_string(s, gif_header); }
   174:     else { write_string(s, html_header); } endif;
   175: 
   176:     var from_strm: flx_stream = UFD fd;
   177:     var to_strm: flx_stream = USOCK s;
   178:     cat(from_strm, to_strm);
   179: 
   180:     flx_close(from_strm); // this'll know how to close a unix fd
   181:   } endif;
   182: 
   183:   // var contents = Text_file::load(fname);
   184:   // print "loaded: "; print contents; endl;
   185:   // print "contents len="; print (len contents); endl;
   186:   // write_string(s, html_header + contents);
   187: 
   188: #endif
   189: }
   190: 
   191: val webby_port = 1234;
   192: 
   193: print "FLX WEB!!! listening on port "; print webby_port; endl;
   194: 
   195: // up the queue len for stress testing
   196: var p = webby_port;
   197: var listener: flx_listener;
   198: mk_flx_listener(&listener, &p, 10);
   199: 
   200: 
   201: forever {
   202:   var s: flx_socket;
   203:   flx_accept(listener, &s);  // blocking
   204:   print "got connection (or something)\n";  // error check here
   205: 
   206:   // hmm - spawning an fthread is blocking the web server. don't know why
   207:   print "spawning fthread to handle connection\n";
   208:   spawn_fthread {
   209: // should spawn fthread here to allow for more io overlap
   210: 
   211:   var line: string;
   212:   get_line(s, &line);  // should be the GET line.
   213: 
   214:   val poo =
   215:   if "GET " == line.[0 to 4] then line.[4 to ] else "" endif;
   216:   print ("poo="poo); endl;
   217: 
   218: //print ("blah " line.[0 to 4]); endl;
   219:   print "got line: "; print line; endl;
   220: 
   221:   // now I need to parse the GET line, get a file name out of its url
   222:   // (e.g. unqualfied -> index.html and name/flx.jpg -> flx.jpg
   223:   var succ: bool;
   224:   var base: string;
   225:   var file: string;
   226: 
   227:   parse_get_line(line, succ, base, file);
   228:   // print "succ="; print succ; endl;
   229: 
   230:   if succ then {
   231:     print "well formed get...\n";
   232:     print "base="; print base; endl;
   233:     print "file="; print file; endl;
   234: 
   235:     serve_file(file, s);
   236:   } else {
   237:     print "BAD get line: "; print line; endl;
   238:   } endif;
   239: 
   240:   flx_close(s);
   241: 
   242:   };
   243: 
   244: };
   245: 
   246: flx_close(listener);
   247: 
   248: 
End felix section to test/demo_webserver.flx[1]