5. Sockets

Start cpp section to faio/faio_posixio.hpp[1 /1 ]
     1: #line 870 "./lpsrc/flx_faio.pak"
     2: #ifndef __FAIO_POSIXIO__
     3: #define __FAIO_POSIXIO__
     4: #include <flx_faio_config.hpp>
     5: 
     6: #include "faio_asyncio.hpp"
     7: #include "faio_drv.hpp"
     8: 
     9: // we don't need to piggyback much data at all. for now just the demuxer,
    10: // so that we can be woken up, and the buffer info (this replaces the
    11: // felix "socket" thread type, which was ugly.
    12: 
    13: #include "demux_posix_demuxer.hpp"
    14: 
    15: // a new sort of demuxer/event source: file io completions
    16: // haven't given up on using the socket style demuxers yet.
    17: #include "demux_pfileio.hpp"
    18: 
    19: #include "demux_timer_queue.hpp"
    20: 
    21: namespace flx { namespace faio {
    22: 
    23: class FAIO_EXTERN socketio_wakeup : public demux::socket_wakeup {
    24: public:
    25:   demux::sel_param   pb;     // in: what you want, out: what you get
    26:   thread_wakeup fw;
    27: 
    28:   int       sio_flags;  // either one of PDEMUX_{READ|WRITE}
    29: 
    30:   virtual void wakeup(demux::posix_demuxer& demux);
    31: };
    32: 
    33: // this can handle most unix style io, that is, read & write on sockets,
    34: // files & pipes. NICE. the fact that the socket is now in here may mean
    35: // I can get rid of the epoll hack
    36: // Not sure if this can be used for file fds.
    37: class FAIO_EXTERN socketio_request : public flx_driver_request_base {
    38: public:
    39:     socketio_wakeup sv;
    40: 
    41:     socketio_request() {}       // Lord Felix demands it. Like STL.
    42: 
    43:     socketio_request(int s, char* buf, long len, bool r);
    44:     virtual bool start_async_op(demux::demuxer& demux, flx_drv* drv,  void* f);
    45: };
    46: 
    47: class FAIO_EXTERN connect_request
    48:   : public flx_driver_request_base, public demux::connect_control_block {
    49: public:
    50:   thread_wakeup fw;
    51: 
    52:   connect_request() {}      // flx linkage
    53: 
    54:   connect_request(const char* addr, int port) { addy = addr; p = port; s=-1; }
    55:   virtual bool start_async_op(demux::demuxer& demux, flx_drv* drv,  void* f);
    56:   virtual void wakeup(demux::posix_demuxer& demux);
    57: };
    58: 
    59: class FAIO_EXTERN accept_request
    60:   : public flx_driver_request_base, public demux::accept_control_block {
    61: public:
    62:   // we sometimes know that there'll be several connections to accept.
    63:   // this'll need a different wakeup - and a different interface between
    64:   // event source & wakeups
    65:   thread_wakeup fw;
    66: 
    67:   accept_request() {} // flx linkage
    68: 
    69:   // eeh, give that a better name
    70:   accept_request(int listener) { s = listener; }
    71: 
    72:   // from flx_driver_request_base
    73:   virtual bool start_async_op(demux::demuxer& demux, flx_drv* drv, void* f);
    74: 
    75:   // from accept_control_block
    76:   virtual void wakeup(demux::posix_demuxer& demux);
    77: };
    78: 
    79: 
    80: // separate pthread file io
    81: // hum. multiple inheritance
    82: class FAIO_EXTERN flxfileio_request
    83:     : public flx_driver_request_base, public demux::fileio_request
    84: {
    85:     thread_wakeup   fw;
    86: public:
    87:     flxfileio_request();           // flx linkage
    88:     ~flxfileio_request();          // flx linkage
    89: 
    90:     flxfileio_request(int f, char* buf, long len, long off, bool rd)
    91:         : fileio_request(f, buf, len, off, rd) {}
    92: 
    93:     // from driver request
    94:     virtual bool start_async_op(demux::demuxer& demux, flx_drv* drv, void* f);
    95: 
    96:     // from async io thread
    97:     virtual void finished();
    98: };
    99: 
   100: }}
   101: #endif
End cpp section to faio/faio_posixio.hpp[1]
Start cpp section to faio/faio_wdrv.hpp[1 /1 ]
     1: #line 972 "./lpsrc/flx_faio.pak"
     2: 
     3: #ifndef __FAIO_WDRV__
     4: #define __FAIO_WDRV__
     5: 
     6: #include "demux_win_timer_queue.hpp"
     7: #include "faio_drv.hpp"
     8: 
     9: namespace flx { namespace faio {
    10: 
    11: class FAIO_EXTERN wflx_drv : public flx_drv {
    12:   demux::win_timer_queue sleepers;
    13: public:
    14:   wflx_drv(flx::pthread::sleep_queue_t& q, int n1, int m1, int n2, int m2);
    15: 
    16:   demux::timer_queue* get_sleepers() { return &sleepers; }
    17: };
    18: 
    19: }}
    20: 
    21: #endif
    22: 
End cpp section to faio/faio_wdrv.hpp[1]
Start cpp section to faio/faio_wdrv.cpp[1 /1 ]
     1: #line 995 "./lpsrc/flx_faio.pak"
     2: 
     3: #include <Windows.h>  // see note in win_timer_queue.hpp
     4: #include "faio_wdrv.hpp"
     5: 
     6: namespace flx { namespace faio {
     7: 
     8:   // vs requirement that I have a cpp file?
     9:   wflx_drv::wflx_drv(flx::pthread::sleep_queue_t& q,int n1,int m1,int n2, int m2) : flx_drv(q,n1,m1) {}
    10: 
    11: }}
    12: 
End cpp section to faio/faio_wdrv.cpp[1]
Start cpp section to faio/faio_posixio.cpp[1 /1 ]
     1: #line 1008 "./lpsrc/flx_faio.pak"
     2: #include <stdio.h>      // printf
     3: #include "faio_posixio.hpp"
     4: #include "demux_sockety.hpp"    // async_connect
     5: 
     6: // pthread async file io stuff
     7: #include "faio_pdrv.hpp"
     8: 
     9: 
    10: #include <sys/types.h>  // getsockopt & co
    11: #include <sys/socket.h>
    12: 
    13: #include <unistd.h>     // close
    14: #include <string.h>     // strerror - probably not portable
    15: #include <assert.h>
    16: 
    17: using namespace flx::demux;
    18: namespace flx { namespace faio {
    19: socketio_request::socketio_request(int s, char* buf, long len, bool read)
    20: {
    21:     sv.s = s;
    22:   // demux supports reading AND writing. We don't. Yet.
    23:   sv.sio_flags = ((read) ? PDEMUX_READ : PDEMUX_WRITE);
    24: 
    25:     sv.pb.buffer = buf;
    26:     sv.pb.buffer_size = len;
    27:     sv.pb.bytes_written = 0;        // really bytes_processed
    28: }
    29: 
    30: bool
    31: socketio_request::start_async_op(demux::demuxer& demux, flx_drv* drv, void* f)
    32: {
    33:     // fprintf(stderr, "adding wakeup: len %i, done %i\n",
    34:     //   sv.pb.buffer_size, sv.pb.bytes_written);
    35: 
    36:     // important: copy down the thread and the driver
    37:     RECORD_THREAD_INFO(sv.fw);
    38: 
    39:     posix_demuxer*  pd = (posix_demuxer*)&demux;
    40: 
    41:     // wake thread if call failed
    42:   return (pd->add_socket_wakeup(&sv, sv.sio_flags) == -1);
    43: }
    44: 
    45: 
    46: void
    47: socketio_wakeup::wakeup(posix_demuxer& demux)
    48: {
    49:     // handle read/write, return true if not finished.
    50:     // otherwise wakeup return false.
    51:   bool  connection_closed;
    52: 
    53:   //fprintf(stderr,"prehandle wakeup, this: %p, read: %i, len: %i, done %i\n",
    54:   //  this, read, pb.buffer_size, pb.bytes_written);
    55: 
    56:   // NOTE: this code does not handle the possibility of both read AND
    57:   // write being set. That would require thinking about the what
    58:   // the connect_closed return value meant. In any case, we don't
    59:   // do that stuff here yet.
    60:   if(wakeup_flags & PDEMUX_READ)
    61:   {
    62:     // just check that our above assumption hasn't been violated.
    63:     assert(wakeup_flags == PDEMUX_READ);
    64:         connection_closed = posix_demuxer::socket_recv(s, &pb);
    65:     }
    66:     else
    67:     {
    68:     // never hurts to be paranoid.
    69:     assert(wakeup_flags == PDEMUX_WRITE);
    70:         connection_closed = posix_demuxer::socket_send(s, &pb);
    71:     }
    72: 
    73:   // fprintf(stderr,"posthandle wakeup, this: %p, read: %i, len: %i, done %i\n",
    74:   //  this, read, pb.buffer_size, pb.bytes_written);
    75:   // fprintf(stderr,"wakeup of %p, closed = %i\n", this, connection_closed);
    76: 
    77:     // wake up: time to process some data
    78:     if(connection_closed || pb.bytes_written == pb.buffer_size)
    79:     {
    80:     // fprintf(stderr,"schedding %p, drv: %p, f: %p\n", this, drv, f);
    81:         //drv->sched(f);
    82:         fw.wake();
    83:         return;
    84:     }
    85: 
    86:   // fprintf(stderr,"not schedding %p\n", this);
    87:   if(demux.add_socket_wakeup(this, sio_flags) == -1)
    88:         fprintf(stderr,"failed to re-add_socket_wakeup\n");
    89: }
    90: 
    91: // asynchronous connect
    92: bool
    93: connect_request::start_async_op(demuxer& demux, flx_drv* drv, void* f)
    94: {
    95: // fprintf(stderr, "connect_request::start_async_op\n");
    96:     // important: copy down the thread and the driver
    97:   RECORD_THREAD_INFO(fw);
    98: 
    99:   posix_demuxer*  pd = (posix_demuxer*)&demux;
   100: 
   101:   // call failed or finished (!), wake up thread as no wakeup coming
   102:   if(start(*pd) == -1) return true;
   103: 
   104:   // NONONONONO! Referring to this's variables after a successful start
   105:   // gives rise to a race condition, which is bad.
   106:   return false;     // do not reschedule after a successful start
   107: 
   108: /*
   109:   // I've not seen this yet, don't know why.
   110:   if(0 == socket_err) fprintf(stderr, "WOW, instant CONNECT\n");
   111: 
   112:   // call didn't fail, could be pending or finished.
   113:   // return socket_err != EINPROGRESS, the contrapositive, sort of
   114:   return 0 == socket_err;   // no err => finished immediately
   115: */
   116: }
   117: 
   118: void
   119: connect_request::wakeup(posix_demuxer& demux)
   120: {
   121: //fprintf(stderr, "connect_request::wakeup\n");
   122: 
   123:   // fprintf(stderr,"connect woke up\n");
   124:   connect_control_block::wakeup(demux);
   125: 
   126:   // felix thread can pick out error itself.
   127:   fw.wake();
   128: }
   129: 
   130: 
   131: // async accept
   132: bool
   133: accept_request::start_async_op(demuxer& demux, flx_drv* drv, void* f)
   134: {
   135:     // important: copy down the thread and the driver
   136:   RECORD_THREAD_INFO(fw);
   137: 
   138:   posix_demuxer*  pd = (posix_demuxer*)&demux;
   139:     return (start(*pd) == -1);      // accept_control_block function
   140: }
   141: 
   142: void
   143: accept_request::wakeup(posix_demuxer& demux)
   144: {
   145:   // does the leg work.
   146:   accept_control_block::wakeup(demux);
   147: 
   148:   if(accepted == -1)
   149:   {
   150:     // I don't know if this is a good idea...
   151:     fprintf(stderr, "accept request failed (%i), retrying...\n",
   152:       socket_err);
   153:     // didn't get it - go back to sleep
   154:     if(start(demux) == -1)
   155:       fprintf(stderr, "failed again... probably was a bad idea\n");
   156:     return;
   157:   }
   158: 
   159:   fw.wake();
   160: }
   161: 
   162: // from driver request
   163: flxfileio_request::~flxfileio_request(){}
   164: flxfileio_request::flxfileio_request(){}
   165: 
   166: 
   167: bool
   168: flxfileio_request::start_async_op(demux::demuxer& demux, flx_drv* drv, void* f)
   169: {
   170:     // printf("driver called fileio start_async_op code\n");
   171:     // important: copy down the thread and the driver
   172:     RECORD_THREAD_INFO(fw);
   173: 
   174:     // need to create the async io thing here, or ask the driver for it
   175:     // driver needs to go a little less portable
   176:     pflx_drv*   pdrv = (pflx_drv*)drv;
   177:     pdrv->get_aio_worker()->add_fileio_request(this);
   178: 
   179:     return false;       // no wakeup
   180: }
   181: 
   182: // from async io thread
   183: void
   184: flxfileio_request::finished()
   185: {
   186:     // fprintf(stderr,"async fileio got finished callback\n");
   187:     fw.wake();    // wake thread
   188: }
   189: 
   190: 
   191: }}
   192: 
End cpp section to faio/faio_posixio.cpp[1]
Start cpp section to faio/faio_winio.hpp[1 /1 ]
     1: #line 1201 "./lpsrc/flx_faio.pak"
     2: #ifndef __DWINIO__
     3: #define __DWINIO__
     4: #include <flx_faio_config.hpp>
     5: 
     6: // visual studio is quite sensitve about how you do these includes.
     7: // THIS is the way (WinSock2.h must include Windows.h).
     8: #include <WinSock2.h>
     9: #include <MSWSock.h>        // AcceptEx, TF_REUSE_SOCKET, etc
    10: 
    11: #include "faio_asyncio.hpp" // flx driver requests
    12: #include "demux_overlapped.hpp"   // nicely wrapped async windows calls
    13: 
    14: namespace flx { namespace faio {
    15: 
    16: // interestingly, because in windows the async objects are associated
    17: // with an IOCP before their use, we don't actually need a demuxer here
    18: // at all. That's kind of nice. (actually iocp_associator uses it now)
    19: 
    20: // a flx driver request to the add socket s to the drivers iocp
    21: // this is currently the only windows driver request that uses the demuxer.
    22: class FAIO_EXTERN iocp_associator : public flx_driver_request_base {
    23:   SOCKET  s;
    24: public:
    25:   // should have result & errcode
    26:   iocp_associator() {}
    27:   iocp_associator(SOCKET associatee) : s(associatee) {}
    28: 
    29:   virtual bool start_async_op(demux::demuxer& demux, flx_drv* drv, void* f);
    30: };
    31: 
    32: // flx <-> c++ stuff for async io (well, it was)
    33: 
    34: // transition to new windows async control block
    35: class FAIO_EXTERN waio_base : public flx_driver_request_base {
    36: protected:
    37:   thread_wakeup fw;
    38: public:
    39:   bool  success;          // eh?
    40: 
    41:   waio_base() : success(false) {}
    42: 
    43:   // actually wakes up thread
    44:   virtual void iocp_op_finished( DWORD nbytes, ULONG_PTR udat,
    45:     LPOVERLAPPED olp, int err);
    46: };
    47: 
    48: 
    49: // listener socket must be already associated with an IOCP
    50: // in doing an AcceptEx, it might succeed immediately - do you still
    51: // get the IOCP wakeup?
    52: class FAIO_EXTERN wasync_accept
    53:   : public waio_base, public demux::acceptex_control_block
    54: {
    55: public:
    56:   wasync_accept() {}  // felix linkage demands it
    57: 
    58:   wasync_accept(SOCKET l, SOCKET a) { listener = l; acceptor = a; }
    59: 
    60:   virtual bool start_async_op(demux::demuxer& demux, flx_drv* drv, void* f);
    61: 
    62:   virtual void iocp_op_finished( DWORD nbytes, ULONG_PTR udat,
    63:     LPOVERLAPPED olp, int err);
    64: };
    65: 
    66: class FAIO_EXTERN connect_ex
    67:   : public waio_base, public demux::connectex_control_block
    68: {
    69: public:
    70: 
    71:   connect_ex() {}     // flx linkage
    72: 
    73:   connect_ex(SOCKET soc, const char* addr, int port)
    74:     { s = soc; addy = addr; p = port; }
    75: 
    76:   virtual bool start_async_op(demux::demuxer& demux, flx_drv* drv, void* f);
    77: 
    78:   virtual void iocp_op_finished( DWORD nbytes, ULONG_PTR udat,
    79:     LPOVERLAPPED olp, int err);
    80: };
    81: 
    82: // TransmitFile here (requires file handle)
    83: class FAIO_EXTERN wasync_transmit_file
    84:   : public waio_base, public demux::transmitfile_control_block
    85: {
    86: public:
    87:   wasync_transmit_file()
    88:     : transmitfile_control_block(INVALID_SOCKET, NULL) {}   // flx linkage
    89: 
    90:   wasync_transmit_file(SOCKET dst)      // for reuse of socket
    91:     : transmitfile_control_block(dst) {}
    92: 
    93:   wasync_transmit_file(SOCKET dst, HANDLE src)  // actual transmitfile
    94:     : transmitfile_control_block(dst, src) {}
    95: 
    96:   // from flx_request_base
    97:   virtual bool start_async_op(demux::demuxer& demux, flx_drv* drv, void* f);
    98: 
    99:   virtual void iocp_op_finished(DWORD nbytes, ULONG_PTR udat,
   100:     LPOVERLAPPED olp, int err);
   101: };
   102: 
   103: // handles both WSASend & WSARecv
   104: class FAIO_EXTERN wsa_socketio
   105:   : public waio_base, public demux::wsasocketio_control_block
   106: {
   107: public:
   108:   wsa_socketio()
   109:     : wsasocketio_control_block(INVALID_SOCKET, NULL, false) {}
   110: 
   111:   wsa_socketio(SOCKET src, demux::sel_param* ppb, bool read)
   112:     : wsasocketio_control_block(src, ppb, read) {}
   113: 
   114:   virtual bool start_async_op(demux::demuxer& demux, flx_drv* drv, void* f);
   115: 
   116:   virtual void iocp_op_finished( DWORD nbytes, ULONG_PTR udat,
   117:     LPOVERLAPPED olp, int err);
   118: };
   119: 
   120: // looks a bit like wsa_socketio (bad name, sends too)
   121: class FAIO_EXTERN winfile_io
   122:   : public waio_base, public demux::winfileio_control_block
   123: {
   124: public:
   125:   winfile_io()      // flx linkage
   126:     : winfileio_control_block(NULL, NULL, 0, false){}
   127: 
   128:   // offset?
   129:   winfile_io(HANDLE f, void* buf, int len, bool read)
   130:     : winfileio_control_block(f, buf, len, read) {}
   131: 
   132:   virtual bool start_async_op(demux::demuxer& demux, flx_drv* drv, void* f);
   133: 
   134:   virtual void iocp_op_finished( DWORD nbytes, ULONG_PTR udat,
   135:     LPOVERLAPPED olp, int err);
   136: };
   137: 
   138: 
   139: }}
   140: #endif  // __DWINIO__
   141: 
End cpp section to faio/faio_winio.hpp[1]
Start cpp section to faio/faio_winio.cpp[1 /1 ]
     1: #line 1343 "./lpsrc/flx_faio.pak"
     2: #include "faio_winio.hpp"
     3: #include <stdio.h>      // printf
     4: using namespace flx::demux;
     5: namespace flx { namespace faio {
     6: 
     7: // way of adding sockets to the IOCP.
     8: bool
     9: iocp_associator::start_async_op(demuxer& demux, flx_drv* drv, void* f)
    10: {
    11:   iocp_demuxer* iod = (iocp_demuxer*)&demux;
    12:   // nasty: note how I'm making the user cookie constant (0).
    13:   if(iod->associate_with_iocp((HANDLE)s, 0) != 0)
    14:     fprintf(stderr,"associate request failed - get result here!\n");
    15: 
    16:   return true;      // wake caller
    17: }
    18: 
    19: void
    20: waio_base::iocp_op_finished( DWORD nbytes, ULONG_PTR udat,
    21:   LPOVERLAPPED olp, int err)
    22: {
    23:   // fprintf(stderr,"general wakeup thing - rescheduling\n");
    24:   //fprintf(stderr,"this: %p, q: %p, f: %p, err: %i\n", this, q, f, err);
    25: 
    26:   // this tells us when things went wrong (store it)
    27:   if(NO_ERROR != err)
    28:     fprintf(stderr,"catchall wakeup got error: %i (should store it)\n", err);
    29: 
    30:   success = (NO_ERROR == err);  // this works pretty well
    31:   fw.wake();
    32: }
    33: 
    34: bool
    35: wasync_accept::start_async_op(demuxer& demux, flx_drv* drv, void* f)
    36: {
    37:   RECORD_THREAD_INFO(fw);   // records enough info for wakeup
    38:   return start_overlapped();
    39: }
    40: 
    41: void
    42: wasync_accept::iocp_op_finished( DWORD nbytes, ULONG_PTR udat,
    43:   LPOVERLAPPED olp, int err)
    44: {
    45:   waio_base::iocp_op_finished(nbytes, udat, olp, err);
    46: }
    47: 
    48: 
    49: bool
    50: connect_ex::start_async_op(demux::demuxer& demux, flx_drv* drv, void* f)
    51: {
    52:   RECORD_THREAD_INFO(fw);   // records enough info for wakeup
    53:   return start_overlapped();
    54: }
    55: 
    56: void
    57: connect_ex::iocp_op_finished( DWORD nbytes, ULONG_PTR udat,
    58:   LPOVERLAPPED olp, int err)
    59: {
    60:   waio_base::iocp_op_finished(nbytes, udat, olp, err);
    61: }
    62: 
    63: 
    64: bool
    65: wasync_transmit_file::start_async_op(demux::demuxer& demux, flx_drv* drv, void* f)
    66: {
    67:   RECORD_THREAD_INFO(fw);   // records enough info for wakeup
    68:   return start_overlapped();
    69: }
    70: 
    71: void
    72: wasync_transmit_file::iocp_op_finished( DWORD nbytes, ULONG_PTR udat,
    73:   LPOVERLAPPED olp, int err)
    74: {
    75:   waio_base::iocp_op_finished(nbytes, udat, olp, err);
    76: }
    77: 
    78: bool
    79: wsa_socketio::start_async_op(demux::demuxer& demux, flx_drv* drv, void* f)
    80: {
    81:   RECORD_THREAD_INFO(fw); // records enough info for wakeup
    82:   return start_overlapped();    // start overlapped op
    83: }
    84: 
    85: // this could be factored into demux... or it might need
    86: // to stay here... this is really a finished that isn't finished
    87: // same goes for winfileio (I think)
    88: void
    89: wsa_socketio::iocp_op_finished( DWORD nbytes, ULONG_PTR udat,
    90:   LPOVERLAPPED olp, int err)
    91: {
    92:   // fprintf(stderr,"wsa_socketio wakeup, nb: %li, err: %i\n", nbytes, err );
    93: // Doing the handling myself - this can restart the the op giving us
    94: // a possible race condition... or not? It should be sync with this call.
    95:   // wsasocketio_control_block::iocp_op_finished(nbytes, udat, olp, err);
    96: 
    97:   ppb->bytes_written += nbytes;
    98: 
    99:   // if we're not finished, we have to reinstall our request
   100:   // zero bytes indicates shutdown/closure, right?
   101:   // might be using this for WSASend. Instead of broken pipes on win32,
   102:   // instead we get WSAECONNRESET (pretty sure) on write. On read?
   103:   if(0 == nbytes || ppb->finished())
   104:   {
   105:     // this'll wake us up
   106:     waio_base::iocp_op_finished(nbytes, udat, olp, err);
   107:   }
   108:   else
   109:   {
   110:     // go back around again
   111:     // this returns a finished flag (bad idea). it can also fail.
   112:     // I think it would be better to know that.
   113:     if(start_overlapped())
   114:       fprintf(stderr, "socketio restart finished! WHAT TO DO!?!\n");
   115:   }
   116: }
   117: 
   118: // file io
   119: 
   120: bool
   121: winfile_io::start_async_op(demuxer& demux, flx_drv* drv, void* f)
   122: {
   123:   RECORD_THREAD_INFO(fw);   // records enough info for wakeup
   124:   return start_overlapped();  // go
   125: }
   126: 
   127: // this too could be factored back, I think. that waio_base finished
   128: // would have to change. it's just asking for for some fn to realise
   129: // that it's finished.
   130: // the byte count shouldn't need to be updated here...
   131: void
   132: winfile_io::iocp_op_finished( DWORD nbytes, ULONG_PTR udat,
   133:   LPOVERLAPPED olp, int err)
   134: {
   135:   // fprintf(stderr,"winfile_io wakeup, nb: %li, err: %i\n", nbytes, err );
   136: 
   137:   // fprintf(stderr,"THIS WAKEUP SHOULD BE THE SAME AS SEND/RECV make it so\n");
   138:   // actually, I don't think it should be. the SOCKET stuff goes around
   139:   // the loop again (and ignores errors, check it doesn't hammer).
   140: 
   141:   // keep track of bytes received.
   142:   pb.bytes_written += nbytes;
   143: 
   144:   waio_base::iocp_op_finished(nbytes, udat, olp, err);
   145: }
   146: 
   147: }}
   148: 
   149: 
End cpp section to faio/faio_winio.cpp[1]
Start felix section to lib/flx_faio.flx[1 /1 ]
     1: #line 1493 "./lpsrc/flx_faio.pak"
     2: #import <flx.flxh>
     3: 
     4: module Faio {
     5: requires package "demux";
     6: requires package "faio";
     7: 
     8: open C_hack;
     9: 
    10: proc faio_req[t](x:&t) {
    11:   // svc_general takes an arbitrary machine
    12:   // address as an argument: the _ref_ denotes
    13:   // that address. It does NOT point to an 'address'
    14:   // but to an faio request .. however the svc doesn't
    15:   // know the type of such requests objects .. so we use
    16:   // the type 'address' as a convenient dummy
    17:   val y : &address = reinterpret[&address] x;
    18:   svc (svc_general y);
    19: }
    20: 
    21: // Faio library bindings
    22: // this file should be portable. it currently defines only a few simple
    23: // driver interactions and copipes. no stream wrappers in here.
    24: 
    25: // get thread self from driver
    26: proc get_thread(thread: ptr[fthread]) {
    27:     svc (svc_get_fthread thread );
    28: }
    29: 
    30: header stdlib_h = '#include <stdlib.h>';            // malloc, free
    31: 
    32: // be careful of this being optimized away
    33: //JS: why? it isn't used .. and its in C_hack anyhow :)
    34: //RF: I'm not seeing it.
    35: fun malloc: int -> address = 'malloc($1)' requires stdlib_h;
    36: proc free: address = 'free($1);' requires stdlib_h;
    37: 
    38: //header asyncio_h = '#include "faio_asyncio.hpp"';
    39: // could go through and conditionally require this header, but I'm
    40: // not going to right now.
    41: // this pulls in sel_params & demuxers & so on
    42: header = '#include "faio_asyncio.hpp"';
    43: 
    44: type sel_param = "flx::demux::sel_param";
    45: type sel_param_ptr = "flx::demux::sel_param*";
    46: 
    47: // this is felix coding at its least subtle
    48: fun to_ptr : sel_param -> sel_param_ptr = '&$1';
    49: 
    50: 
    51: fun get_bytes_done : sel_param_ptr -> int = '$1->bytes_written';
    52: proc init_pb : sel_param*address*int
    53: = '{$1.buffer=(char*)$2;$1.buffer_size=$3;$1.bytes_written=0;}';
    54: 
    55: // this way when I realise that my eof calc is wrong, I can change it
    56: // everywhere at once. so, should exactly n of n bytes read be eof?
    57: // it currently isn't.
    58: proc calc_eof(pb: sel_param_ptr, len: &int, eof: &bool)
    59: {
    60:     var bytes_done = pb.bytes_done;
    61:     *eof = (bytes_done != *len);
    62:     *len = bytes_done;
    63: }
    64: 
    65: // this is a no-op hack to make it look like the argument t
    66: // is actually being used. useful for a few hairy svc/gc interactions
    67: proc gc_collect_me_not_hack[t]: t = ';';
    68: 
    69: // copipe interface.
    70: 
    71: // copipes are actually implemented as end_point_a <= pipe => end_point_b
    72: // felix only needs to know about endpoints, which we'll call pipes.
    73: type copipe = 'flx::faio::copipe_endpt*';
    74: 
    75: // private calls
    76: proc connect : copipe*fthread*sel_param*bool
    77:  = '$1->pipe->connect((void*)$2, &$3, $4, $1->get_channel($4));';
    78: fun pipe_request : copipe -> address = '$1->pipe';
    79: fun to_ptr : copipe -> address = '$1';      // easier than cast
    80: proc priv_shutdown : copipe*int = '$1->shutdown($2);';
    81: proc priv_delete : copipe = 'delete $1;';
    82: 
    83: // this is awful but I don't know how to interact with flx ptrs
    84: // via the c interface. YUK!
    85: //JS: check out C_hack and _ref_ in the rtl
    86: header 'typedef struct{ flx::faio::copipe_endpt* foo[2]; }sorry_awful;';
    87: type co_awful = 'sorry_awful';
    88: proc init_awful_pair : co_awful = 'flx::faio::copipe_endpt::pipe_pair($1.foo);';
    89: fun get : co_awful*int -> copipe = '$1.foo[$2]';
    90: 
    91: proc flx_copipe_pair(p1: &copipe, p2: &copipe)
    92: {
    93:     var awful: co_awful;
    94:     init_awful_pair(awful);
    95:     *p1 = get(awful, 0);
    96:     *p2 = get(awful, 1);
    97: }
    98: 
    99: // not for public consumption
   100: //JS -- then make it private
   101: // the c++ copipe code figures out what you wanted from the pipe itself.
   102: private proc copipe_driver_request(pipe: copipe)
   103: {
   104:     var request = pipe_request pipe;
   105:     faio_req$ &request;
   106: }
   107: 
   108: proc co_rw(pipe: copipe, len: &int, buf: address, eof: &bool, read_flag: bool)
   109: {
   110:     // Suppose I could move this to start_async_op, it knows the thread, but
   111:     // not whether it's reading or writing
   112:     var thread : fthread;
   113:     get_thread(addr thread);                    // ask driver for thread pointer
   114: 
   115:     var pb: sel_param;
   116:     init_pb(pb, buf, *len);
   117: 
   118:     // connect thread to its apropriate read/write channel
   119:     connect(pipe, thread, pb, read_flag);
   120: 
   121:     copipe_driver_request(pipe);
   122: 
   123:     calc_eof(to_ptr(pb), len, eof);
   124: }
   125: 
   126: // public copipe calls
   127: 
   128: // shutdown pipe endpoint. no matter what how is, pipe must still be closed
   129: // how = 0 => no more reads, 1 no more writes, 2 no more anything
   130: proc co_shutdown(pipe: copipe, how: int)
   131: {
   132:     priv_shutdown(pipe, how);
   133:     // let pipe request be evaluated
   134:     copipe_driver_request(pipe);
   135: }
   136: 
   137: proc co_close(pipe: copipe)
   138: {
   139:     co_shutdown(pipe, 2);                   // no more io, eof those waiting
   140:     priv_delete(pipe);
   141:     // the end!
   142: }
   143: 
   144: proc co_read(pipe: copipe, len: &int, buf: address, eof: &bool)
   145: {
   146:     co_rw(pipe, len, buf, eof, true);       // read
   147: }
   148: 
   149: proc co_write(pipe: copipe, len: &int, buf: address, eof: &bool)
   150: {
   151:     co_rw(pipe, len, buf, eof, false);      // read
   152: }
   153: 
   154: // sleep! finally!
   155: 
   156: type sleep_request = 'flx::faio::sleep_request';
   157: fun mk_sleep_request: double -> sleep_request = 'flx::faio::sleep_request($1)';
   158: 
   159: proc sleep(delta: double)
   160: {
   161:   var sr = mk_sleep_request delta;
   162:   faio_req$ &sr;
   163: }
   164: 
   165: 
   166: } // module faio
   167: 
End felix section to lib/flx_faio.flx[1]
Start felix section to lib/flx_faio_posix.flx[1 /1 ]
     1: #line 1661 "./lpsrc/flx_faio.pak"
     2: #import <flx.flxh>
     3: // contains posix async socket io & copipes, all wrapped up streams
     4: 
     5: include "flx_faio";
     6: module Faio_posix  {
     7: header faio_posixio_hpp = '#include "faio_posixio.hpp"';
     8: requires package "demux";
     9: requires package "faio";
    10: open C_hack;        // cast, address
    11: open Faio;
    12: 
    13: // some random stuff, sorta unixy
    14: header unistd_h = '#include <unistd.h>';            // close
    15: header fcntl_h = '#include <fcntl.h>';              // fcntl for O_NONBLOCK
    16: header sys_socket_h = '#include <sys/socket.h>';    // shutdown
    17: header sockety_h = '#include "demux_sockety.hpp"';  // my socket utils
    18: 
    19: proc close: int = 'close($1);' requires unistd_h;
    20: proc shutdown: int*int = 'shutdown($a);' requires sys_socket_h;
    21: 
    22: // this too can be optimised away to nothing
    23: // mucking around with pushing file fd through the async socket code
    24: fun aio_ropen: string -> int = 'open($1.c_str(), O_RDONLY | O_NONBLOCK, 0)'
    25:     requires fcntl_h;
    26: 
    27: // blocking, read only
    28: fun ropen: charp -> int = 'open($1, O_RDONLY, 0)' requires fcntl_h;
    29: 
    30: 
    31: header = '#include "faio_posixio.hpp"';
    32: 
    33: // socketio_request should be renamed to be async_fd_request
    34: type socketio_request = "flx::faio::socketio_request";
    35: 
    36: fun mk_socketio_request: int*address*int*bool -> socketio_request
    37:     = 'flx::faio::socketio_request($1, (char*)$2, $3, $4)';
    38: 
    39: fun get_pb: socketio_request -> sel_param_ptr = '&$1.sv.pb';
    40: 
    41: // read & write differ only by a flag
    42: proc async_rw(fd: int, len: &int, buf: address, eof: &bool, read_flag: bool)
    43: {
    44:     var asyncb = mk_socketio_request(fd, buf, *len, read_flag);
    45: 
    46:     // magic! (break to driver, wake up when it's all over)
    47:     faio_req$ &asyncb;
    48: 
    49:     calc_eof(asyncb.pb, len, eof);
    50: }
    51: 
    52: proc async_read(fd: int, len: &int, buf: address,
    53:     eof: &bool)
    54: {
    55:     async_rw(fd, len, buf, eof, true);      // read
    56: }
    57: 
    58: proc async_write(fd: int, len: &int, buf: address, eof: &bool)
    59: {
    60:     async_rw(fd, len, buf, eof, false);     // write
    61: }
    62: 
    63: type flxfileio_request = "flx::faio::flxfileio_request";
    64: 
    65: // offset ? let it be for a moment
    66: fun mk_faio: int*address*int*int*bool -> flxfileio_request
    67:     = 'flx::faio::flxfileio_request($1, (char*)$2, $3, $4, $5)';
    68: fun get_pb: flxfileio_request -> sel_param_ptr = '&$1.pb';
    69: 
    70: proc faio_rw(fd: int, len: &int, buf: address, eof: &bool, read_flag: bool)
    71: {
    72:     // constant offset for now, rushing to get this in flx_stream
    73:     var faio = mk_faio(fd, buf, *len, 0, read_flag);
    74:     faio_req$ &faio;
    75:     calc_eof(faio.pb, len, eof);
    76: }
    77: 
    78: proc faio_read(fd: int, len: &int, buf: address,
    79:     eof: &bool)
    80: {
    81:     faio_rw(fd, len, buf, eof, true);       // read
    82: }
    83: 
    84: proc faio_write(fd: int, len: &int, buf: address, eof: &bool)
    85: {
    86:     faio_rw(fd, len, buf, eof, false);      // write
    87: }
    88: 
    89: // connect!
    90: type async_connect = 'flx::faio::connect_request';
    91: 
    92: fun mk_async_connect: charp*int -> async_connect = 'flx::faio::connect_request($a)';
    93: fun get_socket: async_connect -> int = '$1.s';
    94: fun get_err: async_connect -> int = '$1.socket_err';
    95: 
    96: // could do multi connects for capable drivers
    97: proc connect(s: &int, addr: charp, port: int)
    98: {
    99:     var ac = mk_async_connect(addr, port);
   100:     faio_req$ &ac;
   101:     *s = ac.socket;
   102: }
   103: 
   104: type accept_request = "flx::faio::accept_request";
   105: 
   106: fun mk_accept: int -> accept_request = 'flx::faio::accept_request($1)';
   107: fun get_socket: accept_request -> int = '$1.accepted';
   108: 
   109: // arg1 = returned socket, arg2 is port, pass 0 to have one assigned
   110: proc mk_listener: lvalue[int]*lvalue[int]*int
   111:     = '$1 = flx::demux::create_async_listener(&$2, $3);' requires sockety_h;
   112: 
   113: proc accept(s: &int, listener: int)
   114: {
   115:     var acc = mk_accept listener ;
   116:     faio_req$ &acc;
   117:     *s = acc.socket;
   118: }
   119: 
   120: 
   121: 
   122: } // module faio
   123: 
End felix section to lib/flx_faio_posix.flx[1]
Start felix section to lib/flx_faio_win32.flx[1 /1 ]
     1: #line 1785 "./lpsrc/flx_faio.pak"
     2: #import <flx.flxh>
     3: include "flx_faio";     // defines copipes & some driver interaction
     4: 
     5: module Faio_win32 {
     6: requires package "demux";
     7: requires package "faio";
     8: // contains windows overlapped/iocp io & copipes. no stream wrapper yet.
     9: open C_hack;
    10: open Faio;
    11: header '#include "faio_winio.hpp"'; // this has everything (includes asyncio.h)
    12: 
    13: // useful windows function
    14: fun GetLastError: 1 -> int = 'GetLastError()';
    15: 
    16: ctypes SOCKET;
    17: 
    18: // maybe don't use this - let the socket be passed in already associated
    19: // with an IOCP. do I have to make this explicitly overlapped? If we
    20: // want async io I think I'll need to associate this with the iocp.
    21: fun cmk_socket : unit -> SOCKET = '::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)';
    22: 
    23: // well that didn't help.
    24: //fun cmk_socket : unit -> SOCKET = 'WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED)';
    25: // must associate with iocp to do overlapped io with s (WSASend/Recv)
    26: proc mk_socket(s: &SOCKET)
    27: {
    28:     *s = cmk_socket();
    29:     associate_with_iocp(*s);                // associate with iocp (errors?).
    30: }
    31: 
    32: 
    33: type wasync_accept = "flx::faio::wasync_accept";
    34: 
    35: fun mk_accept: SOCKET*SOCKET -> wasync_accept = 'flx::faio::wasync_accept($a)';
    36: // make this a parameterised type
    37: fun get_success[t]: t -> bool = '$1.success';
    38: 
    39: // this feels silly
    40: const INVALID_SOCKET: SOCKET = 'INVALID_SOCKET';
    41: // oops, no good if we can't check against it
    42: fun eq : SOCKET*SOCKET -> bool = '($1 == $2)';
    43: 
    44: // windows style accept. accepted is an already created socket, unbound
    45: proc Accept(success: &bool, listener: SOCKET, accepted: SOCKET)
    46: {
    47:     var acc = mk_accept(listener, accepted);
    48: 
    49:     faio_req$ &acc;    // causes AcceptEx to be called
    50: 
    51:     *success = get_success(acc);
    52: }
    53: 
    54: type connect_ex="flx::faio::connect_ex";
    55: fun mk_connect_ex: SOCKET*charp*int -> connect_ex = 'flx::faio::connect_ex($a)';
    56: 
    57: // for use on sockets you make yourself, who knows, maybe you want to
    58: // reuse them
    59: proc Connect(success: &bool, s: SOCKET, addr: charp, port: int)
    60: {
    61:     var con = mk_connect_ex(s, addr, port);
    62:      faio_req$ &con;    // causes ConnectEx to be called
    63:     *success = get_success(con);
    64: }
    65: 
    66: proc Connect(s: &SOCKET, addr: charp, port: int)
    67: {
    68:     mk_socket s;            // error handling?
    69:     var success: bool;
    70:     Connect(&success, *s, addr, port);
    71:     // print "CONNECT success: "; print success; endl;
    72:     // error handling?
    73: }
    74: 
    75: // listens on all interfaces, I guess
    76: proc cmk_listener: lvalue[SOCKET]*lvalue[int]*int
    77:     = '$1 = flx::demux::create_listener_socket(&$2, $3);';
    78: 
    79: proc mk_listener(listener: &SOCKET, port: &int, backlog: int)
    80: {
    81:     *listener <- cmk_listener(*port, backlog);
    82:     associate_with_iocp(*listener);
    83: }
    84: 
    85: // ignores return value
    86: proc closesocket: SOCKET = 'closesocket($1);';
    87: 
    88: const SD_RECEIVE:int = 'SD_RECEIVE';
    89: const SD_SEND:int = 'SD_SEND';
    90: const SD_BOTH:int = 'SD_BOTH';
    91: 
    92: proc shutdown: SOCKET*int = 'shutdown($1, $2);';
    93: 
    94: type wasync_transmit_file = "flx::faio::wasync_transmit_file";
    95: 
    96: // I could just use HANDLEs everywhere, but I want to see how this goes
    97: type WFILE = 'HANDLE';
    98: const INVALID_HANDLE_VALUE: WFILE = 'INVALID_HANDLE_VALUE';
    99: fun eq : WFILE*WFILE -> bool = '($1 == $2)';
   100: 
   101: 
   102: // hacked for ro atm. the 0 means exclusive (not good, but I haven't deciphered
   103: // the flags yet. NULL for non inheritable security attributes.
   104: // OPEN_EXISTING is to make sure it doesn't create the file
   105: // Geez, FILE_ATTRIBUTE_NORMAL? not hidden, not temp, etc.
   106: // final NULL is for template file. not sure what it does, but I don't want it.
   107: // notice that it's opened for SHARED reading
   108: proc OpenFile: lvalue[WFILE]*string =
   109:   '$1 = CreateFile($2.c_str(), FILE_READ_DATA, FILE_SHARE_READ, NULL,\
   110:     OPEN_EXISTING, FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED, NULL);';
   111: 
   112: // basically for windows named pipes
   113: proc OpenFileDuplex: lvalue[WFILE]*string =
   114:   '$1 = CreateFile($2.c_str(), FILE_READ_DATA | FILE_WRITE_DATA,\
   115:      FILE_SHARE_READ | FILE_SHARE_WRITE, NULL, OPEN_EXISTING,\
   116:      FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED, NULL);';
   117: 
   118: proc CloseFile: WFILE = 'if(!CloseHandle($1))\
   119:   fprintf(stderr, "CloseHandle(WFILE) failed: %i\\n", GetLastError());';
   120: 
   121: // error handling?
   122: // proc CloseFile: WFILE = 'CloseHandle($1);';
   123: 
   124: fun mk_transmit_file : SOCKET*WFILE -> wasync_transmit_file
   125:     = 'flx::faio::wasync_transmit_file($a)';
   126: 
   127: // toylike interface for now, but still fun
   128: proc TransmitFile(s: SOCKET, f: WFILE)
   129: {
   130:     var tf = mk_transmit_file(s, f);
   131:     faio_req$ &tf;
   132: }
   133: 
   134: // by passing special flags to TransmitFile we can transform a connected
   135: // socket into a socket ready for use with AcceptEx. DisconnectEx explicitly
   136: // does this and without the warning that accept-style & connect-style sockets
   137: // cannot be reused as the other type (which isn't a problem for my use)
   138: // however I already have TransmitFile code in place.
   139: fun mk_reuse_socket : SOCKET -> wasync_transmit_file
   140:     = 'flx::faio::wasync_transmit_file($a)';
   141: 
   142: proc ReuseSocket(s: SOCKET)
   143: {
   144:     var tf = mk_reuse_socket(s);
   145:     faio_req$ &tf;
   146: }
   147: 
   148: type wsa_socketio = "flx::faio::wsa_socketio";
   149: fun mk_wsa_socketio: SOCKET*sel_param_ptr*bool->wsa_socketio = 'flx::faio::wsa_socketio($a)';
   150: 
   151: proc WSARecv(s: SOCKET, len: &int, buf: address, eof: &bool)
   152: {
   153:     var pb: sel_param;
   154:     init_pb(pb, buf, *len);
   155:     var ppb: sel_param_ptr = to_ptr pb;
   156: 
   157:     var rev = mk_wsa_socketio(s, ppb, true);  // reading
   158:     faio_req$ &rev;
   159: // we do have a success flag
   160:     calc_eof(ppb, len, eof);
   161: }
   162: 
   163: proc WSASend(s: SOCKET, len: &int, buf: address, eof: &bool)
   164: {
   165:     var pb: sel_param;
   166:     init_pb(pb, buf, *len);
   167:     var ppb: sel_param_ptr = to_ptr pb;
   168: 
   169:     var rev = mk_wsa_socketio(s, ppb, false); // writing
   170:     faio_req$ &rev;
   171:     calc_eof(ppb, len, eof);
   172: }
   173: 
   174: type winfile_io = "flx::faio::winfile_io";
   175: 
   176: fun mk_winfile_io: WFILE*address*int*bool->winfile_io = 'flx::faio::winfile_io($a)';
   177: 
   178: // no offset - just for streams now. write probably doesn't work
   179: fun get_pb: winfile_io -> sel_param_ptr = '&$1.pb';
   180: 
   181: proc ReadFile(f: WFILE, len: &int, buf: address, eof: &bool)
   182: {
   183:     var io = mk_winfile_io(f, buf, *len, true); // reading
   184:     faio_req$ &io;
   185: // we do have a success flag
   186:     calc_eof(io.pb, len, eof);
   187: }
   188: 
   189: proc WriteFile(f: WFILE, len: &int, buf: address, eof: &bool)
   190: {
   191:     var io = mk_winfile_io(f, buf, *len, false);    // writing
   192:     faio_req$ &io;
   193:     calc_eof(io.pb, len, eof);
   194: }
   195: 
   196: 
   197: // general request for addition of socket to iocp. might be better to
   198: // just create them that way.
   199: type iocp_associator = "flx::faio::iocp_associator";
   200: fun mk_iocp_associator: SOCKET -> iocp_associator = 'flx::faio::iocp_associator($1)';
   201: 
   202: // this ends up just casting to a handle, so I should be able to use
   203: // this for other HANDLEs. Note that the user cookie is not settable
   204: // via this interface.
   205: proc associate_with_iocp(s: SOCKET)
   206: {
   207:     // results? err code?
   208:     var req = mk_iocp_associator(s);
   209:     faio_req$ &req;
   210: }
   211: 
   212: } // module win32_faio
   213: 
End felix section to lib/flx_faio_win32.flx[1]
Start felix section to lib/flx_stream.flx[1 /1 ]
     1: #line 1999 "./lpsrc/flx_faio.pak"
     2: include "flx_faio";
     3: include "flx_faio_posix";
     4: include "flx_faio_win32";
     5: #import <flx_platform.flxh>
     6: 
     7: open Faio;  // copipes
     8: 
     9: #if POSIX
    10: open Faio_posix;
    11: #endif
    12: 
    13: #if WIN32
    14: open Faio_win32;
    15: #endif
    16: 
    17: 
    18: module Flx_stream {
    19: requires package "demux";
    20: requires package "faio";
    21: //JS: the type name should be flx_stream_t by convention ..
    22: union flx_stream =
    23:     | CO of copipe     // copipe
    24:     | DEVNULL          // nothing
    25: #if POSIX
    26:     | UFD of int       // Unix file
    27:     | USOCK of int     // Unix socket
    28: #endif
    29: // isn't this covered by the fact that under cygwin POSIX AND WIN32 are true?
    30: //#if WIN32 or CYWGIN
    31: #if WIN32
    32:     | WSOCK of SOCKET  // Windows socket
    33:     | WFD of WFILE     // Windows file
    34: #endif
    35: 
    36: // MISSING CASES: strstreams  -- strings as streams
    37: // FILE* streams, C++ iostreams as streams. kind of messy ..
    38: // windows named pipes (may or may not work with the WFD file io)
    39: // try it out.
    40: 
    41: ;
    42: 
    43: // will be ambiguous for file fd, will need name. do I even need these
    44: // when the conversion is unambiguous?
    45: 
    46: //JS: use the constructor, no need for a separate
    47: //JS: function to make things, that's what
    48: //JS: type constructors are for .. maybe the names
    49: //JS: should be changed to FLX_STRM_UFD or something ..
    50: 
    51: //fun to_stream (p: copipe): flx_stream = { return CO p; }
    52: //fun new_devnull (): flx_stream = { return DEVNULL; }
    53: //fun file_to_stream (fd: int): flx_stream = { return UFD fd; }
    54: //fun to_stream (fd: int): flx_stream = { return USOCK fd; }
    55: //fun to_stream (f: WFILE): flx_stream = { return WFD f; }
    56: //fun to_stream (s: SOCKET): flx_stream = { return WSOCK s; }
    57: 
    58: proc flx_read(strm: flx_stream, len: &int, buf: address, eof: &bool)
    59: {
    60:     match strm with
    61:     | CO ?pipe => { co_read(pipe, len, buf, eof); }
    62:     | DEVNULL => { *len = 0; *eof = true; }
    63: #if POSIX
    64:     | USOCK ?fd => { async_read(fd, len, buf, eof); }
    65:     | UFD ?fd => { faio_read(fd, len, buf, eof); }
    66: #endif
    67: //#if WIN32 or CYGWIN
    68: #if WIN32
    69:     | WSOCK ?s => { WSARecv(s, len, buf, eof); }
    70:     | WFD ?file => { ReadFile(file, len, buf, eof); }
    71: #endif
    72:     endmatch;
    73: }
    74: 
    75: proc flx_write(strm: flx_stream, len: &int, buf: address, eof: &bool)
    76: {
    77:     match strm with
    78:     | CO ?pipe => { co_write(pipe, len, buf, eof); }
    79:     | DEVNULL => { /* nothing to do */ }
    80: #if POSIX
    81:     | USOCK ?fd => { async_write(fd, len, buf, eof); }
    82:     | UFD ?fd => { faio_write(fd, len, buf, eof); }
    83: #endif
    84: //#if WIN32 or CYGWIN
    85: #if WIN32
    86:     | WSOCK ?s => { WSASend(s, len, buf, eof); }
    87:     | WFD ?file => { WriteFile(file, len, buf, eof); }
    88: #endif
    89:     endmatch;
    90: }
    91: 
    92: proc flx_shutdown(strm: flx_stream, how: int)
    93: {
    94:     match strm with
    95:     | CO ?pipe => { co_shutdown(pipe, how); }
    96:     | DEVNULL => { /* nothing to do? */ }
    97: #if POSIX
    98:     | USOCK ?socket => { shutdown(socket, how); }
    99:     | UFD => { /* nuthin */ }
   100: #endif
   101: //#if WIN32 or CYGWIN
   102: #if WIN32
   103:     | WFD => { /* nothing to do */ }
   104:     | WSOCK ?socket => { shutdown(socket, how); }
   105: #endif
   106:     endmatch;
   107: }
   108: 
   109: proc flx_close(strm: flx_stream)
   110: {
   111:     match strm with
   112:     | CO ?pipe => { co_close(pipe); }
   113:     | DEVNULL => { /* nothing to do */ }
   114: #if POSIX
   115:     | USOCK ?socket => { close(socket); }    // error check?
   116:     | UFD ?fd => { close(fd); }          // error check?
   117: #endif
   118: //#if WIN32 or CYGWIN
   119: #if WIN32
   120:     | WSOCK ?socket => { closesocket(socket); }
   121:     | WFD ?file => { CloseFile(file); }
   122: #endif
   123:     endmatch;
   124: }
   125: 
   126: proc flx_popen(ourend: &flx_stream, p:flx_stream->0)
   127: {
   128:     var a: copipe;
   129:     var b: copipe;
   130: 
   131:     flx_copipe_pair(&a, &b);
   132:     *ourend = CO a;
   133:     var theirend = CO b;
   134:     spawn_fthread { p theirend; };
   135: }
   136: 
   137: 
   138: 
   139: // fun things that probably need renaming
   140: proc cat(infd: flx_stream, outfd: flx_stream, buf: address, bufsize: int) {
   141:     var eof = false;
   142:     var weof = false;
   143:     var len: int;
   144: 
   145:     // if we finish input, stop. if output eofs, don't keep hammering on it!
   146:     while{not(eof) and not(weof)} {
   147:         len = bufsize;
   148: // print "catting in "; print len; print " bytes\n";
   149:         flx_read(infd, &len, buf, &eof);
   150: // print "catting out "; print len; print " bytes\n";
   151:         flx_write(outfd, &len, buf, &weof);
   152:     };
   153: };
   154: 
   155: proc cat(infd: flx_stream, outfd: flx_stream)
   156: {
   157:     val BUFSIZE = 10*1024;
   158:     var buf = malloc(BUFSIZE);
   159: 
   160:     // that's some nice error checking
   161:     cat(infd, outfd, buf, BUFSIZE);
   162:     free(buf);
   163: }
   164: 
   165: open List;
   166: 
   167: // if these are all file descriptors, one might not like to have them
   168: // all open at once. would a generator be better?
   169: // could make this supercat, with multiple in, multiple out (multiout
   170: // in the sense of tee)
   171: proc cat(in_fds: list[flx_stream], outfd: flx_stream,
   172:     buf: address, bufsize: int)
   173: {
   174:     match in_fds with
   175:     | Empty[flx_stream] => {}               // finished
   176:     | Cons[flx_stream] (?fd, ?l) =>
   177:         {
   178:             cat(fd, outfd, buf, bufsize);   // cat first
   179:             cat(l, outfd, buf, bufsize);    // cat the rest
   180:         }
   181:     endmatch
   182:     ;
   183: }
   184: 
   185: // try the stream interface
   186: proc echo(fd: flx_stream, buf: address, bufsize: int)
   187: {
   188:     // echo a = cat a a. that's deep, man.
   189:     cat(fd, fd, buf, bufsize);
   190: };
   191: 
   192: // playing around. need to be able to control io buffers in a more
   193: // global way. I like the supercat idea. need to be careful not to hammer
   194: // multiple outputs. move tee to own file
   195: proc tee(infd: flx_stream, outfd: flx_stream, outfd2: flx_stream)
   196: {
   197:     var eof = false;
   198:     var weof = false;
   199:     var weof2 = false;
   200:     var len: int;
   201: 
   202:     val BUFSIZE = 10*1024;
   203:     var buf = malloc(BUFSIZE);
   204: 
   205:     // don't hammer!
   206:     while{not(eof) and not(weof) and not(weof2)} {
   207:         len = BUFSIZE;
   208:         flx_read(infd, &len, buf, &eof);
   209:         flx_write(outfd, &len, buf, &weof);
   210:         flx_write(outfd2, &len, buf, &weof2);
   211:     };
   212:     free buf;
   213: }
   214: 
   215: } // module Flx_stream
   216: 
End felix section to lib/flx_stream.flx[1]
Start felix section to lib/flx_socket.flx[1 /1 ]
     1: #line 2216 "./lpsrc/flx_faio.pak"
     2: 
     3: include "flx_faio";
     4: #import <flx_platform.flxh>
     5: 
     6: #if POSIX
     7: include "flx_faio_posix";
     8: open Faio_posix;
     9: #endif
    10: 
    11: #if WIN32
    12: include "flx_faio_win32";
    13: open Faio_win32;
    14: #endif
    15: 
    16: // for the flx_socket_to_stream function
    17: include "flx_stream";
    18: open Flx_stream;
    19: 
    20: module Flx_socket {
    21: 
    22: #if POSIX
    23: 
    24: typedef flx_listener = int;  // unix socket
    25: typedef flx_socket = int;
    26: 
    27: fun flx_socket_to_stream(s: flx_socket) : flx_stream => USOCK s;
    28: 
    29: proc mk_flx_listener(l: &flx_listener, port: &int, qlen: int)
    30: {
    31:   // error checking?
    32:   var listener: int <- mk_listener(*port, qlen);
    33:   *l = listener;
    34: }
    35: 
    36: proc flx_accept(l: flx_listener, s: &flx_socket)
    37: {
    38:   accept(s, l);  // success or not? error checking
    39: }
    40: 
    41: #elif WIN32
    42: 
    43: typedef flx_listener = SOCKET;
    44: typedef flx_socket = SOCKET;
    45: 
    46: fun flx_socket_to_stream(s: flx_socket) : flx_stream => WSOCK s;
    47: 
    48: proc mk_flx_listener(l: &flx_listener, port: &int, qlen: int)
    49: {
    50:   // error checking?
    51:   var listener: SOCKET;
    52:   mk_listener(&listener, port, qlen);
    53:   *l = listener;
    54: }
    55: 
    56: proc flx_accept(l: flx_listener, s: &flx_socket)
    57: {
    58:   var success: bool;
    59:   // for async accept on win32 you create the accept socket yourself
    60:   mk_socket(s);  // error check?
    61: 
    62:   // fprint (cout, "flx_accept calls AcceptEx\n");
    63:   Accept(&success, l, *s);
    64: 
    65:   // err
    66:   if success then {} else {fprint (cout, "Accept failed! num?\n"); } endif;
    67: }
    68: 
    69: #endif
    70: 
    71: proc flx_read(s: flx_socket, len: &int, buf: address, eof: &bool)
    72: {
    73: #if POSIX
    74:   async_read(s, len, buf, eof);
    75: #elif WIN32
    76:   WSARecv(s, len, buf, eof);
    77: #endif
    78: }
    79: 
    80: proc flx_write(s: flx_socket, len: &int, buf: address, eof: &bool)
    81: {
    82: #if POSIX
    83:   async_write(s, len, buf, eof);
    84: #elif WIN32
    85:   WSASend(s, len, buf, eof);
    86: #endif
    87: }
    88: 
    89: // this should work without the conditional compilation
    90: proc flx_shutdown(s: flx_socket, how: int)
    91: {
    92: #if POSIX
    93:   shutdown(s, how);
    94: #elif WIN32
    95:   shutdown(s, how);
    96: #endif
    97: }
    98: 
    99: proc flx_close(s: flx_socket)
   100: {
   101: #if POSIX
   102:   close(s);    // error check?
   103: #elif WIN32
   104:   closesocket(s);
   105: #endif
   106: }
   107: 
   108: //#if WIN32 or CYGWIN
   109: #if WIN32
   110: proc flx_connect_win32(strm: &flx_socket, addr: charp, port: int)
   111: {
   112:     var s: SOCKET;
   113:     Connect( &s, addr, port );
   114:     // nice error handling
   115:     *strm = s;
   116: }
   117: #endif
   118: 
   119: #if POSIX
   120: proc flx_connect_posix(strm: &flx_socket, addr: charp, port: int)
   121: {
   122:     var s: int;
   123:     connect( &s, addr, port );
   124:     // nice error handling
   125:     *strm = s;
   126: }
   127: #endif
   128: 
   129: // unified name, ipv4, cygwin gets posix (?)
   130: proc flx_connect(s: &flx_socket, addr: charp, port: int)
   131: {
   132: #if WIN32
   133:     flx_connect_win32(s, addr, port);
   134: #else
   135:     flx_connect_posix(s, addr, port);
   136: #endif
   137: }
   138: 
   139: /// TEMPORARILY PUTTING THIS CRUFTY READ/PUT LINE CODE HERE
   140: 
   141: // write_line's actually harder than this...
   142: // I used to have trouble with the string's memory, just floating away...
   143: 
   144: // currently ignoring how string was terminated
   145: // but you'd like to get a bool result for when you've
   146: // hit eof. also isn't portable wrt linefeeds. crashes when
   147: // run from cygwin. that's a bummer.
   148: proc get_line(strm: flx_socket, s: &string)
   149: {
   150:   var c: char;
   151:   val ac = C_hack::address_of(c);
   152:   var str: string;
   153:   var finished = false;
   154: 
   155:    while{not finished}
   156:    {
   157:      var len = 1;
   158:      var eof: bool;
   159: 
   160:      flx_read(strm, &len, ac, &eof);
   161: 
   162:      // that's not working
   163:      if eof or c == char '\n' then
   164:      {
   165:        // print "OUTTAHEWR\n";
   166:        finished = true;
   167:      }
   168:      else
   169:      {
   170:        // print "got something: "; print c; endl;
   171:        str += c;
   172:        // print "str is now: "; print str; endl;
   173:      } endif;
   174: 
   175:   };
   176: 
   177:   *s = str;  // pass back result
   178: 
   179: }
   180: 
   181: fun str_addr: string -> address = "(void*)$1.c_str()";
   182: 
   183: proc write_string(strm: flx_socket, s: string)
   184: {
   185: // I have to write a copy, else I get bad/stale memory for the string.
   186: // is this something to do with inlining?
   187: var bugger = s;
   188: // print "writing "; print bugger; endl;
   189:   var slen = len (bugger);
   190:   var addr = str_addr(bugger);  // hope this string memory doesn't move on me
   191:   var eof: bool;
   192:   // print "write_line len: "; print slen; endl;
   193:   flx_write(strm, &slen, addr, &eof);
   194: }
   195: } // module Flx_socket
   196: 
End felix section to lib/flx_socket.flx[1]