7. Windows I/O

Start cpp section to faio/faio_winio.hpp[1 /1 ]
     1: #line 4 "./lpsrc/flx_winio.ipk"
     2: #ifndef __FLX_FAIO_WINIO_H__
     3: #define __FLX_FAIO_WINIO_H__
     4: #include <flx_faio_config.hpp>
     5: 
     6: // visual studio is quite sensitve about how you do these includes.
     7: // THIS is the way (WinSock2.h must include Windows.h).
     8: #include <WinSock2.h>
     9: #include <MSWSock.h>        // AcceptEx, TF_REUSE_SOCKET, etc
    10: 
    11: #include "faio_asyncio.hpp" // flx driver requests
    12: #include "demux_overlapped.hpp"   // nicely wrapped async windows calls
    13: 
    14: namespace flx { namespace faio {
    15: 
    16: // interestingly, because in windows the async objects are associated
    17: // with an IOCP before their use, we don't actually need a demuxer here
    18: // at all. That's kind of nice. (actually iocp_associator uses it now)
    19: 
    20: // a flx driver request to the add socket s to the drivers iocp
    21: // this is currently the only windows driver request that uses the demuxer.
    22: class FAIO_EXTERN iocp_associator : public flx_driver_request_base {
    23:   SOCKET  s;
    24: public:
    25:   demux::iocp_demuxer *iod;
    26:   // should have result & errcode
    27:   iocp_associator() : iod(0) {}
    28:   iocp_associator(demux::iocp_demuxer *iod_a, SOCKET associatee)
    29:   : iod(iod_a), s(associatee) {}
    30: 
    31:   bool start_async_op_impl();
    32: };
    33: 
    34: // flx <-> c++ stuff for async io (well, it was)
    35: 
    36: // transition to new windows async control block
    37: class FAIO_EXTERN waio_base : public flx_driver_request_base {
    38: protected:
    39:   finote_t *fn_a;
    40: public:
    41:   demux::iocp_demuxer *iod;
    42:   bool  success;          // eh?
    43: 
    44:   waio_base() : success(false), iod(0) {}
    45:   waio_base(demux::iocp_demuxer *iod_a) : iod(iod_a), success(false) {}
    46: 
    47:   // actually wakes up thread
    48:   virtual void iocp_op_finished( DWORD nbytes, ULONG_PTR udat,
    49:     LPOVERLAPPED olp, int err);
    50: };
    51: 
    52: 
    53: // listener socket must be already associated with an IOCP
    54: // in doing an AcceptEx, it might succeed immediately - do you still
    55: // get the IOCP wakeup?
    56: class FAIO_EXTERN wasync_accept
    57:   : public waio_base, public demux::acceptex_control_block
    58: {
    59: public:
    60:   wasync_accept() {}  // felix linkage demands it
    61: 
    62:   wasync_accept(demux::iocp_demuxer *iod_a,SOCKET l, SOCKET a) : waio_base(iod_a) { listener = l; acceptor = a; }
    63: 
    64:   bool start_async_op_impl();
    65: 
    66:   virtual void iocp_op_finished( DWORD nbytes, ULONG_PTR udat,
    67:     LPOVERLAPPED olp, int err);
    68: };
    69: 
    70: class FAIO_EXTERN connect_ex
    71:   : public waio_base, public demux::connectex_control_block
    72: {
    73: public:
    74: 
    75:   connect_ex() {}     // flx linkage
    76: 
    77:   connect_ex(demux::iocp_demuxer *iod_a,SOCKET soc, const char* addr, int port)
    78:     : waio_base(iod_a) { s = soc; addy = addr; p = port; }
    79: 
    80:   bool start_async_op_impl();
    81: 
    82:   virtual void iocp_op_finished( DWORD nbytes, ULONG_PTR udat,
    83:     LPOVERLAPPED olp, int err);
    84: };
    85: 
    86: // TransmitFile here (requires file handle)
    87: class FAIO_EXTERN wasync_transmit_file
    88:   : public waio_base, public demux::transmitfile_control_block
    89: {
    90: public:
    91:   wasync_transmit_file()
    92:     : waio_base(0), transmitfile_control_block(INVALID_SOCKET, NULL) {}   // flx linkage
    93: 
    94:   wasync_transmit_file(demux::iocp_demuxer *iod_a,SOCKET dst)      // for reuse of socket
    95:     : waio_base(iod_a), transmitfile_control_block(dst) {}
    96: 
    97:   wasync_transmit_file(demux::iocp_demuxer *iod_a,SOCKET dst, HANDLE src)  // actual transmitfile
    98:     : waio_base(iod_a), transmitfile_control_block(dst, src) {}
    99: 
   100:   // from flx_request_base
   101:   bool start_async_op_impl();
   102: 
   103:   virtual void iocp_op_finished(DWORD nbytes, ULONG_PTR udat,
   104:     LPOVERLAPPED olp, int err);
   105: };
   106: 
   107: // handles both WSASend & WSARecv
   108: class FAIO_EXTERN wsa_socketio
   109:   : public waio_base, public demux::wsasocketio_control_block
   110: {
   111: public:
   112:   wsa_socketio()
   113:     : wsasocketio_control_block(INVALID_SOCKET, NULL, false) {}
   114: 
   115:   wsa_socketio(demux::iocp_demuxer *iod_a,SOCKET src, demux::sel_param* ppb, bool read)
   116:     : waio_base(iod_a), wsasocketio_control_block(src, ppb, read) {}
   117: 
   118:   bool start_async_op_impl();
   119: 
   120:   virtual void iocp_op_finished( DWORD nbytes, ULONG_PTR udat,
   121:     LPOVERLAPPED olp, int err);
   122: };
   123: 
   124: // looks a bit like wsa_socketio (bad name, sends too)
   125: class FAIO_EXTERN winfile_io
   126:   : public waio_base, public demux::winfileio_control_block
   127: {
   128: public:
   129:   winfile_io()      // flx linkage
   130:     : winfileio_control_block(NULL, NULL, 0, false){}
   131: 
   132:   // offset?
   133:   winfile_io(demux::iocp_demuxer *iod_a,HANDLE f, void* buf, int len, bool read)
   134:     : waio_base(iod_a), winfileio_control_block(f, buf, len, read) {}
   135: 
   136:   bool start_async_op_impl();
   137: 
   138:   virtual void iocp_op_finished( DWORD nbytes, ULONG_PTR udat,
   139:     LPOVERLAPPED olp, int err);
   140: };
   141: 
   142: 
   143: }}
   144: #endif  // __DWINIO__
   145: 
End cpp section to faio/faio_winio.hpp[1]
Start cpp section to faio/faio_winio.cpp[1 /1 ]
     1: #line 150 "./lpsrc/flx_winio.ipk"
     2: #include "faio_winio.hpp"
     3: #include <stdio.h>      // printf
     4: using namespace flx::demux;
     5: namespace flx { namespace faio {
     6: 
     7: // way of adding sockets to the IOCP.
     8: bool
     9: iocp_associator::start_async_op_impl()
    10: {
    11:   fprintf(stderr,"iocp_associator: start async_op_impl\n");
    12: 
    13:   // nasty: note how I'm making the user cookie constant (0).
    14:   if(iod->associate_with_iocp((HANDLE)s, 0) != 0)
    15:     fprintf(stderr,"associate request failed - get result here!\n");
    16: 
    17:   return true;      // wake caller
    18: }
    19: 
    20: void
    21: waio_base::iocp_op_finished( DWORD nbytes, ULONG_PTR udat,
    22:   LPOVERLAPPED olp, int err)
    23: {
    24:   // fprintf(stderr,"general wakeup thing - rescheduling\n");
    25:   //fprintf(stderr,"this: %p, q: %p, f: %p, err: %i\n", this, q, f, err);
    26: 
    27:   // this tells us when things went wrong (store it)
    28:   if(NO_ERROR != err)
    29:     fprintf(stderr,"catchall wakeup got error: %i (should store it)\n", err);
    30: 
    31:   success = (NO_ERROR == err);  // this works pretty well
    32:   notify_finished();
    33: }
    34: 
    35: bool
    36: wasync_accept::start_async_op_impl()
    37: {
    38:   fprintf(stderr,"wasync_accept: start async_op_impl\n");
    39:   return start_overlapped();
    40: }
    41: 
    42: void
    43: wasync_accept::iocp_op_finished( DWORD nbytes, ULONG_PTR udat,
    44:   LPOVERLAPPED olp, int err)
    45: {
    46:   waio_base::iocp_op_finished(nbytes, udat, olp, err);
    47: }
    48: 
    49: 
    50: bool
    51: connect_ex::start_async_op_impl()
    52: {
    53:   fprintf(stderr,"connect_ex: start async_op_impl\n");
    54:   return start_overlapped();
    55: }
    56: 
    57: void
    58: connect_ex::iocp_op_finished( DWORD nbytes, ULONG_PTR udat,
    59:   LPOVERLAPPED olp, int err)
    60: {
    61:   waio_base::iocp_op_finished(nbytes, udat, olp, err);
    62: }
    63: 
    64: 
    65: bool
    66: wasync_transmit_file::start_async_op_impl()
    67: {
    68:   fprintf(stderr,"wasync_transmit_file: start async_op_impl\n");
    69:   return start_overlapped();
    70: }
    71: 
    72: void
    73: wasync_transmit_file::iocp_op_finished( DWORD nbytes, ULONG_PTR udat,
    74:   LPOVERLAPPED olp, int err)
    75: {
    76:   waio_base::iocp_op_finished(nbytes, udat, olp, err);
    77: }
    78: 
    79: bool
    80: wsa_socketio::start_async_op_impl()
    81: {
    82:   fprintf(stderr,"wsa_socketio: start async_op_impl\n");
    83:   return start_overlapped();    // start overlapped op
    84: }
    85: 
    86: // this could be factored into demux... or it might need
    87: // to stay here... this is really a finished that isn't finished
    88: // same goes for winfileio (I think)
    89: void
    90: wsa_socketio::iocp_op_finished( DWORD nbytes, ULONG_PTR udat,
    91:   LPOVERLAPPED olp, int err)
    92: {
    93:   // fprintf(stderr,"wsa_socketio wakeup, nb: %li, err: %i\n", nbytes, err );
    94: // Doing the handling myself - this can restart the the op giving us
    95: // a possible race condition... or not? It should be sync with this call.
    96:   // wsasocketio_control_block::iocp_op_finished(nbytes, udat, olp, err);
    97: 
    98:   ppb->bytes_written += nbytes;
    99: 
   100:   // if we're not finished, we have to reinstall our request
   101:   // zero bytes indicates shutdown/closure, right?
   102:   // might be using this for WSASend. Instead of broken pipes on win32,
   103:   // instead we get WSAECONNRESET (pretty sure) on write. On read?
   104:   if(0 == nbytes || ppb->finished())
   105:   {
   106:     // this'll wake us up
   107:     waio_base::iocp_op_finished(nbytes, udat, olp, err);
   108:   }
   109:   else
   110:   {
   111:     // go back around again
   112:     // this returns a finished flag (bad idea). it can also fail.
   113:     // I think it would be better to know that.
   114:     if(start_overlapped())
   115:       fprintf(stderr, "socketio restart finished! WHAT TO DO!?!\n");
   116:   }
   117: }
   118: 
   119: // file io
   120: 
   121: bool
   122: winfile_io::start_async_op_impl()
   123: {
   124:   fprintf(stderr,"winfile_io: start async_op_impl\n");
   125:   return start_overlapped();  // go
   126: }
   127: 
   128: // this too could be factored back, I think. that waio_base finished
   129: // would have to change. it's just asking for for some fn to realise
   130: // that it's finished.
   131: // the byte count shouldn't need to be updated here...
   132: void
   133: winfile_io::iocp_op_finished( DWORD nbytes, ULONG_PTR udat,
   134:   LPOVERLAPPED olp, int err)
   135: {
   136:   // fprintf(stderr,"winfile_io wakeup, nb: %li, err: %i\n", nbytes, err );
   137: 
   138:   // fprintf(stderr,"THIS WAKEUP SHOULD BE THE SAME AS SEND/RECV make it so\n");
   139:   // actually, I don't think it should be. the SOCKET stuff goes around
   140:   // the loop again (and ignores errors, check it doesn't hammer).
   141: 
   142:   // keep track of bytes received.
   143:   pb.bytes_written += nbytes;
   144: 
   145:   waio_base::iocp_op_finished(nbytes, udat, olp, err);
   146: }
   147: 
   148: }}
   149: 
   150: 
End cpp section to faio/faio_winio.cpp[1]
Start felix section to lib/flx_faio_win32.flx[1 /1 ]
     1: #line 301 "./lpsrc/flx_winio.ipk"
     2: #import <flx.flxh>
     3: include "flx_faio";     // defines copipes & some driver interaction
     4: include "flx_demux";
     5: 
     6: module Faio_win32 {
     7: requires package "demux";
     8: requires package "faio";
     9: // contains windows overlapped/iocp io & copipes. no stream wrapper yet.
    10: open C_hack;
    11: open Faio;
    12: open Demux;
    13: 
    14: header '#include "faio_winio.hpp"'; // this has everything (includes asyncio.h)
    15: 
    16: // useful windows function
    17: fun GetLastError: 1 -> int = 'GetLastError()';
    18: 
    19: ctypes SOCKET;
    20: 
    21: typedef socket_t = SOCKET;
    22: 
    23: // maybe don't use this - let the socket be passed in already associated
    24: // with an IOCP. do I have to make this explicitly overlapped? If we
    25: // want async io I think I'll need to associate this with the iocp.
    26: fun cmk_socket : unit -> SOCKET = '::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)';
    27: 
    28: // well that didn't help.
    29: //fun cmk_socket : unit -> SOCKET = 'WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED)';
    30: // must associate with iocp to do overlapped io with s (WSASend/Recv)
    31: proc mk_socket(s: &SOCKET)
    32: {
    33:     *s = cmk_socket();
    34:     associate_with_iocp(*s);                // associate with iocp (errors?).
    35: }
    36: 
    37: 
    38: type wasync_accept = "flx::faio::wasync_accept";
    39: 
    40: fun mk_accept: demuxer *  SOCKET*SOCKET -> wasync_accept = 'flx::faio::wasync_accept($a)';
    41: // make this a parameterised type
    42: fun get_success[t]: t -> bool = '$1.success';
    43: 
    44: // this feels silly
    45: const INVALID_SOCKET: SOCKET = 'INVALID_SOCKET';
    46: // oops, no good if we can't check against it
    47: fun eq : SOCKET*SOCKET -> bool = '($1 == $2)';
    48: 
    49: // windows style accept. accepted is an already created socket, unbound
    50: proc Accept(success: &bool, listener: SOCKET, accepted: SOCKET)
    51: {
    52:     var acc = mk_accept(sys_demux,listener, accepted);
    53:     faio_req$ &acc;    // causes AcceptEx to be called
    54:     *success = get_success(acc);
    55: }
    56: 
    57: type connect_ex="flx::faio::connect_ex";
    58: fun mk_connect_ex: demuxer * SOCKET*charp*int -> connect_ex = 'flx::faio::connect_ex($a)';
    59: 
    60: // for use on sockets you make yourself, who knows, maybe you want to
    61: // reuse them
    62: proc Connect(success: &bool, s: SOCKET, addr: charp, port: int)
    63: {
    64:     var con = mk_connect_ex(sys_demux,s, addr, port);
    65:      faio_req$ &con;    // causes ConnectEx to be called
    66:     *success = get_success(con);
    67: }
    68: 
    69: proc Connect(s: &SOCKET, addr: charp, port: int)
    70: {
    71:     mk_socket s;            // error handling?
    72:     var success: bool;
    73:     Connect(&success, *s, addr, port);
    74:     // print "CONNECT success: "; print success; endl;
    75:     // error handling?
    76: }
    77: 
    78: // listens on all interfaces, I guess
    79: proc cmk_listener: lvalue[SOCKET]*lvalue[int]*int
    80:     = '$1 = flx::demux::create_listener_socket(&$2, $3);';
    81: 
    82: proc mk_listener(listener: &SOCKET, port: &int, backlog: int)
    83: {
    84:     *listener <- cmk_listener(*port, backlog);
    85:     associate_with_iocp(*listener);
    86: }
    87: 
    88: // ignores return value
    89: proc closesocket: SOCKET = 'closesocket($1);';
    90: 
    91: const SD_RECEIVE:int = 'SD_RECEIVE';
    92: const SD_SEND:int = 'SD_SEND';
    93: const SD_BOTH:int = 'SD_BOTH';
    94: 
    95: proc shutdown: SOCKET*int = 'shutdown($1, $2);';
    96: 
    97: type wasync_transmit_file = "flx::faio::wasync_transmit_file";
    98: 
    99: // I could just use HANDLEs everywhere, but I want to see how this goes
   100: type WFILE = 'HANDLE';
   101: typedef fd_t = WFILE;
   102: 
   103: const INVALID_HANDLE_VALUE: WFILE = 'INVALID_HANDLE_VALUE';
   104: fun eq : WFILE*WFILE -> bool = '($1 == $2)';
   105: 
   106: 
   107: // hacked for ro atm. the 0 means exclusive (not good, but I haven't deciphered
   108: // the flags yet. NULL for non inheritable security attributes.
   109: // OPEN_EXISTING is to make sure it doesn't create the file
   110: // Geez, FILE_ATTRIBUTE_NORMAL? not hidden, not temp, etc.
   111: // final NULL is for template file. not sure what it does, but I don't want it.
   112: // notice that it's opened for SHARED reading
   113: proc OpenFile: lvalue[WFILE]*string =
   114:   '$1 = CreateFile($2.c_str(), FILE_READ_DATA, FILE_SHARE_READ, NULL,\
   115:     OPEN_EXISTING, FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED, NULL);';
   116: 
   117: // basically for windows named pipes
   118: proc OpenFileDuplex: lvalue[WFILE]*string =
   119:   '$1 = CreateFile($2.c_str(), FILE_READ_DATA | FILE_WRITE_DATA,\
   120:      FILE_SHARE_READ | FILE_SHARE_WRITE, NULL, OPEN_EXISTING,\
   121:      FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED, NULL);';
   122: 
   123: proc CloseFile: WFILE = 'if(!CloseHandle($1))\
   124:   fprintf(stderr, "CloseHandle(WFILE) failed: %i\\n", GetLastError());';
   125: 
   126: // error handling?
   127: // proc CloseFile: WFILE = 'CloseHandle($1);';
   128: 
   129: fun mk_transmit_file : demuxer * SOCKET*WFILE -> wasync_transmit_file
   130:     = 'flx::faio::wasync_transmit_file($a)';
   131: 
   132: // toylike interface for now, but still fun
   133: proc TransmitFile(s: SOCKET, f: WFILE)
   134: {
   135:     var tf = mk_transmit_file(sys_demux,s, f);
   136:     faio_req$ &tf;
   137: }
   138: 
   139: // by passing special flags to TransmitFile we can transform a connected
   140: // socket into a socket ready for use with AcceptEx. DisconnectEx explicitly
   141: // does this and without the warning that accept-style & connect-style sockets
   142: // cannot be reused as the other type (which isn't a problem for my use)
   143: // however I already have TransmitFile code in place.
   144: fun mk_reuse_socket : demuxer * SOCKET -> wasync_transmit_file
   145:     = 'flx::faio::wasync_transmit_file($a)';
   146: 
   147: proc ReuseSocket(s: SOCKET)
   148: {
   149:     var tf = mk_reuse_socket(sys_demux,s);
   150:     faio_req$ &tf;
   151: }
   152: 
   153: type wsa_socketio = "flx::faio::wsa_socketio";
   154: gen mk_wsa_socketio: demuxer * SOCKET*sel_param_ptr*bool->wsa_socketio = 'flx::faio::wsa_socketio($a)';
   155: 
   156: private fun to_ptr : sel_param -> sel_param_ptr = '&$1';
   157: 
   158: 
   159: proc WSARecv(s: SOCKET, len: &int, buf: address, eof: &bool)
   160: {
   161:     var pb: sel_param;
   162:     init_pb(pb, buf, *len);
   163:     var ppb: sel_param_ptr = to_ptr pb;
   164: 
   165:     var rev = mk_wsa_socketio(sys_demux,s, ppb, true);  // reading
   166:     faio_req$ &rev;
   167: // we do have a success flag
   168:     calc_eof(ppb, len, eof);
   169: }
   170: 
   171: proc WSASend(s: SOCKET, len: &int, buf: address, eof: &bool)
   172: {
   173:     var pb: sel_param;
   174:     init_pb(pb, buf, *len);
   175:     var ppb: sel_param_ptr = to_ptr pb;
   176: 
   177:     var rev = mk_wsa_socketio(sys_demux,s, ppb, false); // writing
   178:     faio_req$ &rev;
   179:     calc_eof(ppb, len, eof);
   180: }
   181: 
   182: type winfile_io = "flx::faio::winfile_io";
   183: 
   184: fun mk_winfile_io: demuxer * WFILE*address*int*bool->winfile_io = 'flx::faio::winfile_io($a)';
   185: 
   186: // no offset - just for streams now. write probably doesn't work
   187: fun get_pb: winfile_io -> sel_param_ptr = '&$1.pb';
   188: 
   189: proc ReadFile(f: WFILE, len: &int, buf: address, eof: &bool)
   190: {
   191:     var io = mk_winfile_io(sys_demux, f, buf, *len, true); // reading
   192:     faio_req$ &io;
   193: // we do have a success flag
   194:     calc_eof(io.pb, len, eof);
   195: }
   196: 
   197: proc WriteFile(f: WFILE, len: &int, buf: address, eof: &bool)
   198: {
   199:     var io = mk_winfile_io(sys_demux, f, buf, *len, false);    // writing
   200:     faio_req$ &io;
   201:     calc_eof(io.pb, len, eof);
   202: }
   203: 
   204: 
   205: // general request for addition of socket to iocp. might be better to
   206: // just create them that way.
   207: type iocp_associator = "flx::faio::iocp_associator";
   208: fun mk_iocp_associator: demuxer * SOCKET -> iocp_associator = 'flx::faio::iocp_associator($a)';
   209: 
   210: // this ends up just casting to a handle, so I should be able to use
   211: // this for other HANDLEs. Note that the user cookie is not settable
   212: // via this interface.
   213: proc associate_with_iocp(s: SOCKET)
   214: {
   215:     // results? err code?
   216:     var req = mk_iocp_associator(sys_demux, s);
   217:     faio_req$ &req;
   218: }
   219: 
   220: } // module win32_faio
   221: 
   222: 
End felix section to lib/flx_faio_win32.flx[1]
Start felix section to lib/flx_faio.flx[1 /1 ]
     1: #line 217 "./lpsrc/flx_faio.pak"
     2: #import <flx.flxh>
     3: 
     4: module Faio {
     5: requires package "demux";
     6: requires package "faio";
     7: 
     8: open C_hack;
     9: 
    10: proc faio_req[t](x:&t) {
    11:   val y : &address = reinterpret[&address] x;
    12:   svc (svc_general y);
    13: }
    14: 
    15: proc get_thread(thread: ptr[fthread]) {
    16:     svc (svc_get_fthread thread );
    17: }
    18: 
    19: header stdlib_h = '#include <stdlib.h>';            // malloc, free
    20: header = '#include "faio_asyncio.hpp"';
    21: header faio_timer_h = '#include "faio_timer.hpp"';
    22: 
    23: type sel_param = "flx::demux::sel_param";
    24: type sel_param_ptr = "flx::demux::sel_param*";
    25: 
    26: fun get_bytes_done : sel_param_ptr -> int = '$1->bytes_written';
    27: proc init_pb : sel_param*address*int
    28: = '{$1.buffer=(char*)$2;$1.buffer_size=$3;$1.bytes_written=0;}';
    29: 
    30: proc calc_eof(pb: sel_param_ptr, len: &int, eof: &bool)
    31: {
    32:     var bytes_done = pb.bytes_done;
    33:     *eof = (bytes_done != *len);
    34:     *len = bytes_done;
    35: }
    36: 
    37: type sleep_request = 'flx::faio::sleep_request' requires faio_timer_h;
    38: type alarm_clock = 'flx::demux::timer_queue*' requires faio_timer_h;
    39: 
    40: fun mk_alarm_clock: 1 -> alarm_clock = 'flx::demux::mk_timer_queue()';
    41: fun mk_sleep_request: alarm_clock * double -> sleep_request = 'flx::faio::sleep_request($1,$2)';
    42: 
    43: var sys_clock = mk_alarm_clock();
    44: 
    45: proc sleep(clock: alarm_clock, delta: double)
    46: {
    47:   var sr = mk_sleep_request$ clock,delta;
    48:   faio_req$ &sr;
    49: }
    50: 
    51: 
    52: } // module faio
    53: 
End felix section to lib/flx_faio.flx[1]
Start felix section to lib/flx_stream.flx[1 /1 ]
     1: #line 271 "./lpsrc/flx_faio.pak"
     2: #import <flx.flxh>
     3: 
     4: include "flx_faio";
     5: #if POSIX
     6: include "flx_faio_posix";
     7: #elif WIN32
     8: include "flx_faio_win32";
     9: #endif
    10: 
    11: module Flx_stream {
    12: requires package "demux";
    13: requires package "faio";
    14: 
    15: open Faio;
    16: 
    17: #if POSIX
    18: open Faio_posix;
    19: typedef fd_t = Faio_posix::fd_t;
    20: #elif WIN32
    21: open Faio_win32;
    22: #endif
    23: 
    24: union devnull_t = DEVNULL;
    25: 
    26: typeclass IByteStream[s] {
    27:   virtual proc read: s * &int * address * &bool;
    28: }
    29: 
    30: typeclass OByteStream[s] {
    31:   virtual proc write: s * &int * address * &bool;
    32: }
    33: 
    34: typeclass IOByteStream[s] {
    35:   inherit IByteStream[s];
    36:   inherit OByteStream[s];
    37: }
    38: 
    39: typeclass TerminalIByteStream[s] {
    40:   inherit IByteStream[s];
    41:   virtual proc iclose: s;
    42: }
    43: 
    44: typeclass TerminalOByteStream[s] {
    45:   inherit OByteStream[s];
    46:   virtual proc oclose: s;
    47: }
    48: 
    49: typeclass TerminalIOByteStream[s] {
    50:   inherit TerminalIByteStream[s];
    51:   inherit TerminalOByteStream[s];
    52:   virtual proc ioclose: s;
    53: }
    54: 
    55: //
    56: // devnull_t
    57: //
    58: instance IByteStream[devnull_t]
    59: {
    60:   proc read(strm: devnull_t, len: &int, buf: address, eof: &bool)
    61:     { *len = 0; *eof = true; }
    62: }
    63: 
    64: instance OByteStream[devnull_t]
    65: {
    66:   proc write(strm: devnull_t, len: &int, buf: address, eof: &bool)
    67:     { *eof = false; }
    68: }
    69: 
    70: instance IOByteStream[devnull_t] {}
    71: instance TerminalIByteStream[devnull_t] { proc iclose (x:devnull_t) { } }
    72: instance TerminalOByteStream[devnull_t] { proc oclose (x:devnull_t) { } }
    73: instance TerminalIOByteStream[devnull_t] { proc ioclose (x:devnull_t) { } }
    74: 
    75: 
    76: //
    77: // fd_t -- native file handle (disk file)
    78: //
    79: instance IByteStream[fd_t]
    80: {
    81:   proc read(s: fd_t, len: &int, buf: address, eof: &bool)
    82:   #if POSIX
    83:     { faio_read(s, len, buf, eof); }
    84:   #elif WIN32
    85:     { ReadFile(s, len, buf, eof); }
    86:   #endif
    87: }
    88: 
    89: instance OByteStream[fd_t]
    90: {
    91:   proc write(s: fd_t, len: &int, buf: address, eof: &bool)
    92:   #if POSIX
    93:     { faio_write(s, len, buf, eof); }
    94:   #elif WIN32
    95:     { WriteFile(s, len, buf, eof); }
    96:   #endif
    97: }
    98: 
    99: instance IOByteStream[fd_t] {}
   100: 
   101: instance TerminalIByteStream[fd_t]
   102: {
   103:   proc iclose (f:fd_t)
   104:   #if POSIX
   105:     { close f; }
   106:   #elif WIN32
   107:     { CloseFile f; }
   108:   #endif
   109: }
   110: 
   111: instance TerminalOByteStream[fd_t]
   112: {
   113:   proc oclose (f:fd_t)
   114:   #if POSIX
   115:     { close f; }
   116:   #elif WIN32
   117:     { CloseFile f; }
   118:   #endif
   119: }
   120: 
   121: instance TerminalIOByteStream[fd_t]
   122: {
   123:   proc ioclose (f:fd_t)
   124:   #if POSIX
   125:     { close f; }
   126:   #elif WIN32
   127:     { CloseFile f; }
   128:   #endif
   129: }
   130: 
   131: proc cat[istr,ostr with IByteStream[istr], OByteStream[ostr]]
   132: (infd: istr , outfd: ostr, buf: address, bufsize: int)
   133: {
   134:     var eof = false;
   135:     var weof = false;
   136:     var len: int;
   137: 
   138:     // if we finish input, stop. if output eofs, don't keep hammering on it!
   139:     while{not(eof) and not(weof)} {
   140:         len = bufsize;
   141:         read(infd, &len, buf, &eof);
   142:         //fprint$ cerr,q"catted in $len bytes, eof=$eof\n";
   143:         //fprint$ cerr, string(C_hack::cast[charp] buf,len);
   144:         write(outfd, &len, buf, &weof);
   145:         //fprint$ cerr,q"catted out $len bytes, eof=$weof\n";
   146:     };
   147: }
   148: 
   149: // It's very unfortunate that memcmp doesn't return the position
   150: // of the first non-equality
   151: 
   152: proc stream_cmp[istr1,istr2 with IByteStream[istr1], IByteStream[istr2]]
   153: (fd1: istr1 , fd2: istr2, buf1: address, buf2: address, bufsize: int, sign: &int)
   154: {
   155:   var eof1 = false;
   156:   var eof2 = false;
   157:   var len1: int;
   158:   var len2: int;
   159:   var terminated = false;
   160:   var cmp = 0;
   161: 
   162:   while{cmp == 0 and not terminated} {
   163:     len1 = bufsize; read(fd1, &len1, buf1, &eof1);
   164:     len2 = bufsize; read(fd2, &len2, buf2, &eof2);
   165: //print "Len1="; print len1; endl;
   166: //print "Len2="; print len2; endl;
   167: 
   168:     len  := min(len1,len2);
   169:     cmp = Carray::memcmp(buf1, buf2, size len);
   170: 
   171:     if cmp == 0 do
   172:       cmp = len1 - len2;
   173:       if cmp == 0 do
   174:         terminated = eof1 and eof2;
   175:         cmp =
   176:           // ugg: false = case 0, true = case 1
   177:           match eof1, eof2 with
   178:           | case 1, case 1 => 0
   179:           | case 0, case 0 => 0
   180:           | case 0, case 1 => 1
   181:           | case 1, case 0 => -1
   182:           endmatch
   183:         ;
   184:       done;
   185:     done;
   186:   };
   187:   *sign = cmp;
   188: }
   189: 
   190: 
   191: proc cmp
   192:   [istr1, istr2 with IByteStream[istr1], IByteStream[istr2]]
   193:   (i1: istr1, i2: istr2, res: &int)
   194: {
   195:   val BUFSIZE = 100000;
   196:   var buf1 = C_hack::malloc(BUFSIZE);
   197:   var buf2 = C_hack::malloc(BUFSIZE);
   198:   stream_cmp(i1, i2, buf1, buf2, BUFSIZE, res);
   199:   C_hack::free(buf1);
   200:   C_hack::free(buf2);
   201: }
   202: 
   203: proc cat[istr,ostr with IByteStream[istr], OByteStream[ostr]]
   204: (infd: istr, outfd: ostr)
   205: {
   206:     val BUFSIZE = 100000;
   207:     var buf = C_hack::malloc(BUFSIZE);
   208: 
   209:     // that's some nice error checking
   210:     cat(infd, outfd, buf, BUFSIZE);
   211:     C_hack::free(buf);
   212: }
   213: 
   214: open List;
   215: 
   216: proc cat[istr,ostr with IByteStream[istr], OByteStream[ostr]]
   217: (in_fds: list[istr], outfd: ostr,
   218:     buf: address, bufsize: int)
   219: {
   220:     match in_fds with
   221:     | Empty[istr] => {}                     // finished
   222:     | Cons[istr] (?fd, ?l) =>
   223:         {
   224:             cat(fd, outfd, buf, bufsize);   // cat first
   225:             cat(l, outfd, buf, bufsize);    // cat the rest
   226:         }
   227:     endmatch
   228:     ;
   229: }
   230: 
   231: proc echo[iostr with IOByteStream[iostr]]
   232: (fd: iostr, buf: address, bufsize: int)
   233: {
   234:     // echo a = cat a a. that's deep, man.
   235:     cat(fd, fd, buf, bufsize);
   236: };
   237: 
   238: proc tee[istr,ostr with IByteStream[istr], OByteStream[ostr]]
   239: (infd: istr, outfd: ostr, outfd2: ostr)
   240: {
   241:     var eof = false;
   242:     var weof = false;
   243:     var weof2 = false;
   244:     var len: int;
   245: 
   246:     val BUFSIZE = 10*1024;
   247:     var buf = C_hack::malloc(BUFSIZE);
   248: 
   249:     // don't hammer!
   250:     while{not(eof) and not(weof) and not(weof2)} {
   251:         len = BUFSIZE;
   252:         read(infd, &len, buf, &eof);
   253:         write(outfd, &len, buf, &weof);
   254:         write(outfd2, &len, buf, &weof2);
   255:     };
   256:     C_hack::free buf;
   257: }
   258: 
   259: // highly inefficient!
   260: proc get_line[istr with IByteStream[istr]]
   261: (strm: istr, s: &string)
   262: {
   263:   var c: char;
   264:   val ac = C_hack::cast[address]$ C_hack::addr c;
   265:   var st: string;
   266:   var finished = false;
   267: 
   268:   whilst not finished do
   269:     var len = 1;
   270:     var eof: bool;
   271: 
   272:     read(strm, &len, ac, &eof);
   273: 
   274:     if eof or c == char '\n' do
   275:       finished = true;
   276:     else
   277:       st += c;
   278:     done;
   279:   done;
   280:   *s = st;  // pass back result
   281: }
   282: 
   283: proc write_string[ostr with OByteStream[ostr]]
   284: (sk: ostr, var s: string)
   285: {
   286:   var slen = len s;
   287:   var a = C_hack::cast[address]$ cstr s;
   288:   var eof: bool;
   289:   write(sk, &slen, a, &eof);
   290: }
   291: 
   292: 
   293: } // module Flx_stream
   294: 
End felix section to lib/flx_stream.flx[1]
Start felix section to lib/flx_socket.flx[1 /1 ]
     1: #line 566 "./lpsrc/flx_faio.pak"
     2: #import <flx.flxh>
     3: include "flx_faio";
     4: include "flx_stream";
     5: 
     6: #if POSIX
     7: include "flx_faio_posix";
     8: #elif WIN32
     9: include "flx_faio_win32";
    10: #endif
    11: 
    12: module Flx_socket {
    13: 
    14: #if POSIX
    15: typedef socket_t = Faio_posix::socket_t;
    16: #elif WIN32
    17: typedef socket_t = Faio_win32::socket_t;
    18: #endif
    19: 
    20: 
    21: proc mk_listener(l: &socket_t, port: &int, qlen: int)
    22: {
    23: #if POSIX
    24:   Faio_posix::mk_listener(l,port, qlen);
    25: #elif WIN32
    26:   Faio_win32::mk_listener(l, port, qlen);
    27: #endif
    28: }
    29: 
    30: proc accept(l: socket_t, s: &socket_t)
    31: {
    32: #if POSIX
    33:   Faio_posix::accept(s, l);  // success or not? error checking
    34: #elif WIN32
    35:   var success: bool;
    36:   Faio_win32::mk_socket(s);  // error check?
    37:   Faio_win32::Accept(&success, l, *s);
    38:   if not success do
    39:     fprint (cout, "Accept failed! num?\n");
    40:   done;
    41: #endif
    42: }
    43: 
    44: proc shutdown(s: socket_t, how: int)
    45: {
    46: #if POSIX
    47:   Faio_posix::shutdown(s, how);
    48: #elif WIN32
    49:   Faio_win32::shutdown(s, how);
    50: #endif
    51: }
    52: 
    53: proc connect(s: &socket_t, addr: charp, port: int)
    54: {
    55: #if POSIX
    56:     Faio_posix::connect(s, addr, port );
    57: #elif WIN32
    58:     Faio_win32::Connect( s, addr, port );
    59: #endif
    60: }
    61: 
    62: 
    63: //
    64: // socket_t
    65: //
    66: instance Flx_stream::IByteStream[socket_t]
    67: {
    68:   proc read(s: socket_t, len: &int, buf: address, eof: &bool)
    69:   #if POSIX
    70:     { Faio_posix::async_read(s, len, buf, eof); }
    71:   #elif WIN32
    72:     { Faio_win32::WSARecv(s, len, buf, eof); }
    73:   #endif
    74: }
    75: 
    76: instance Flx_stream::OByteStream[socket_t]
    77: {
    78:   proc write(s: socket_t, len: &int, buf: address, eof: &bool)
    79:   #if POSIX
    80:     { Faio_posix::async_write(s, len, buf, eof); }
    81:   #elif WIN32
    82:     { Faio_win32::WSASend(s, len, buf, eof); }
    83:   #endif
    84: }
    85: 
    86: instance Flx_stream::IOByteStream[socket_t] {}
    87: 
    88: instance Flx_stream::TerminalIByteStream[socket_t]
    89: {
    90:   proc iclose (s:socket_t)
    91:   #if POSIX
    92:     { Faio_posix::shutdown (s,0); Faio_posix::close s; }
    93:   #else
    94:     { Faio_win32::closesocket s; }
    95:   #endif
    96: }
    97: 
    98: instance Flx_stream::TerminalOByteStream[socket_t]
    99: {
   100:   proc oclose (s:socket_t)
   101:   #if POSIX
   102:     { Faio_posix::shutdown (s,1); Faio_posix::close s; }
   103:   #elif WIN32
   104:     { Faio_win32::closesocket s; }
   105:   #endif
   106: }
   107: 
   108: instance Flx_stream::TerminalIOByteStream[socket_t]
   109: {
   110:   proc ioclose (s:socket_t)
   111:   #if POSIX
   112:     {
   113:       //fprint (cerr,q"STREAM:Closing socket $s\n");
   114:       Faio_posix::shutdown(s,2);
   115:       Faio::sleep (Faio::sys_clock,5.0);
   116:       /*
   117:       var len = 1; var eof = false; var buf = C_hack::malloc(1);
   118:       Faio_posix::async_read(s, &len, buf, &eof);
   119:       fprint (cerr,q"STREAM:socket $s, eof=$eof\n");
   120:       Faio_posix::shutdown(s,0);
   121:       */
   122:       Faio_posix::close s;
   123:     }
   124:   #elif WIN32
   125:     { Faio_win32::closesocket s; }
   126:   #endif
   127: }
   128: 
   129: }
   130: 
End felix section to lib/flx_socket.flx[1]