6. Posix Sockets

Start cpp section to faio/faio_posixio.hpp[1 /1 ]
     1: #line 4 "./lpsrc/flx_posixio.ipk"
     2: #ifndef __FLX_FAIO_POSIXIO_H__
     3: #define __FLX_FAIO_POSIXIO_H__
     4: #include <flx_faio_config.hpp>
     5: 
     6: #include "faio_asyncio.hpp"
     7: 
     8: // we don't need to piggyback much data at all. for now just the demuxer,
     9: // so that we can be woken up, and the buffer info (this replaces the
    10: // felix "socket" thread type, which was ugly.
    11: 
    12: #include "demux_posix_demuxer.hpp"
    13: 
    14: // a new sort of demuxer/event source: file io completions
    15: // haven't given up on using the socket style demuxers yet.
    16: #include "demux_pfileio.hpp"
    17: 
    18: #include "demux_timer_queue.hpp"
    19: 
    20: namespace flx { namespace faio {
    21: 
    22: class FAIO_EXTERN socketio_wakeup : public demux::socket_wakeup {
    23: public:
    24:   demux::sel_param   pb;     // in: what you want, out: what you get
    25:   int       sio_flags;  // either one of PDEMUX_{READ|WRITE}A
    26:   struct socketio_request *request;
    27: 
    28:   virtual void wakeup(demux::posix_demuxer& demux);
    29: };
    30: 
    31: // this can handle most unix style io, that is, read & write on sockets,
    32: // files & pipes. NICE. the fact that the socket is now in here may mean
    33: // I can get rid of the epoll hack
    34: // Not sure if this can be used for file fds.
    35: class FAIO_EXTERN socketio_request : public flx_driver_request_base {
    36: public:
    37:     socketio_wakeup sv;
    38:     demux::posix_demuxer *pd;
    39:     socketio_request() {}       // Lord Felix demands it. Like STL.
    40:     socketio_request(socketio_request const&);
    41:     void operator = (socketio_request const&);
    42: 
    43:     socketio_request(demux::posix_demuxer *pd_a, int s, char* buf, long len, bool r);
    44:     bool start_async_op_impl();
    45: };
    46: 
    47: // client open
    48: class FAIO_EXTERN connect_request
    49:   : public flx_driver_request_base, public demux::connect_control_block {
    50: public:
    51:   demux::posix_demuxer *pd;
    52:   connect_request() {}      // flx linkage
    53: 
    54:   connect_request(demux::posix_demuxer *pd_a,const char* addr, int port);
    55:   bool start_async_op_impl();
    56:   virtual void wakeup(demux::posix_demuxer&);
    57: };
    58: 
    59: // server open
    60: class FAIO_EXTERN accept_request
    61:   : public flx_driver_request_base, public demux::accept_control_block {
    62: public:
    63:   // we sometimes know that there'll be several connections to accept.
    64:   // this'll need a different wakeup - and a different interface between
    65:   // event source & wakeups
    66: 
    67:   demux::posix_demuxer *pd;
    68:   accept_request() {} // flx linkage
    69: 
    70:   // eeh, give that a better name
    71:   accept_request(demux::posix_demuxer *pd_a, int listener) : pd(pd_a) { s = listener; }
    72: 
    73:   // from flx_driver_request_base
    74:   bool start_async_op_impl();
    75: 
    76:   // from accept_control_block
    77:   virtual void wakeup(demux::posix_demuxer& demux);
    78: };
    79: 
    80: 
    81: // separate pthread file io
    82: // hum. multiple inheritance
    83: class FAIO_EXTERN flxfileio_request
    84:     : public flx_driver_request_base, public demux::fileio_request
    85: {
    86:     pthread::worker_fifo       *aio_worker;
    87: public:
    88:     flxfileio_request();           // flx linkage
    89:     ~flxfileio_request();          // flx linkage
    90: 
    91:     flxfileio_request(
    92:       pthread::worker_fifo *a,
    93:       int f, char* buf, long len, long off, bool rd
    94:      )
    95:         : fileio_request(f, buf, len, off, rd), aio_worker(a)
    96:      {
    97:        //fprintf(stderr, "flxfilio_request ctor\n");
    98:      }
    99: 
   100:     // from driver request
   101:     bool start_async_op_impl();
   102:     void finished(); // fileio_request
   103: };
   104: 
   105: }}
   106: #endif
   107: 
End cpp section to faio/faio_posixio.hpp[1]
Start cpp section to faio/faio_posixio.cpp[1 /1 ]
     1: #line 112 "./lpsrc/flx_posixio.ipk"
     2: #include <stdio.h>      // printf
     3: #include "faio_posixio.hpp"
     4: #include "demux_sockety.hpp"    // async_connect
     5: 
     6: #include <sys/types.h>  // getsockopt & co
     7: #include <sys/socket.h>
     8: 
     9: #include <unistd.h>     // close
    10: #include <string.h>     // strerror - probably not portable
    11: #include <assert.h>
    12: 
    13: using namespace flx::demux;
    14: namespace flx { namespace faio {
    15: 
    16: connect_request::connect_request(demux::posix_demuxer *pd_a,const char* addr, int port) :pd(pd_a) { addy = addr; p = port; s=-1; }
    17: 
    18: socketio_request::socketio_request(demux::posix_demuxer *pd_a, int s, char* buf, long len, bool read)
    19: : pd(pd_a)
    20: {
    21:   //fprintf(stderr,"socketio_request %p making socketio_wakeup for socket %d\n",this,s);
    22:   sv.s = s;
    23:   sv.request = this;
    24:   // demux supports reading AND writing. We don't. Yet.
    25:   sv.sio_flags = ((read) ? PDEMUX_READ : PDEMUX_WRITE);
    26: 
    27:   sv.pb.buffer = buf;
    28:   sv.pb.buffer_size = len;
    29:   sv.pb.bytes_written = 0;        // really bytes_processed
    30: }
    31: 
    32: socketio_request::socketio_request(socketio_request const &a) : pd(a.pd)
    33: {
    34:   //fprintf(stderr, "copying socketio_request to %p\n",this);
    35:   sv = a.sv;
    36:   sv.request = this;
    37: }
    38: 
    39: // EXTREME HACKERY!
    40: void socketio_request::operator=(socketio_request const &a)
    41: {
    42:   //fprintf(stderr, "assigning socketio_request to %p\n",this);
    43: 
    44:   flx_driver_request_base::operator=(a);
    45:   sv = a.sv;
    46:   sv.request = this;
    47:   pd = a.pd;
    48: }
    49: 
    50: bool
    51: socketio_request::start_async_op_impl()
    52: {
    53:   //fprintf(stderr,"socketio_request: socket %d start async_op_impl %p\n",sv.s,this);
    54:   // fprintf(stderr, "adding wakeup: len %i, done %i\n",
    55:   //   sv.pb.buffer_size, sv.pb.bytes_written);
    56: 
    57:   // wake thread if call failed
    58:   bool failed = (pd->add_socket_wakeup(&sv, sv.sio_flags) == -1);
    59:   if (failed)
    60:     fprintf(stderr,"socketio_request FAILED %p, sock=%d, dir=%d\n",this, sv.s, sv.sio_flags);
    61:   //else
    62:   //  fprintf(stderr,"socketio_request OK %p\n",this);
    63:   return failed;
    64: }
    65: 
    66: 
    67: void
    68: socketio_wakeup::wakeup(posix_demuxer& demux)
    69: {
    70:   // handle read/write, return true if not finished.
    71:   // otherwise wakeup return false.
    72:   bool  connection_closed;
    73: 
    74:   //fprintf(stderr, "making socketio_wakeup %p\n",this);
    75:   //fprintf(stderr,"prehandle wakeup, this: %p, read: %i, len: %i, done %i\n",
    76:   //  this, read, pb.buffer_size, pb.bytes_written);
    77: 
    78:   // NOTE: this code does not handle the possibility of both read AND
    79:   // write being set. That would require thinking about the what
    80:   // the connect_closed return value meant. In any case, we don't
    81:   // do that stuff here yet.
    82: 
    83:   if(wakeup_flags & PDEMUX_ERROR)
    84:   {
    85:     connection_closed = true;
    86:     //pb.bytes_written=0;
    87:   }
    88: 
    89:   else if(wakeup_flags & PDEMUX_EOF)
    90:   {
    91:     connection_closed = true;
    92:     //pb.bytes_written=0;
    93:   }
    94: 
    95:   else if(wakeup_flags & PDEMUX_READ)
    96:   {
    97:     // just check that our above assumption hasn't been violated.
    98:     assert(wakeup_flags == PDEMUX_READ);
    99:     connection_closed = posix_demuxer::socket_recv(s, &pb);
   100:   }
   101:   else
   102:   {
   103:     // never hurts to be paranoid.
   104:     assert(wakeup_flags == PDEMUX_WRITE);
   105:     connection_closed = posix_demuxer::socket_send(s, &pb);
   106:   }
   107: 
   108:   // fprintf(stderr,"posthandle wakeup, this: %p, read: %i, len: %i, done %i\n",
   109:   //  this, read, pb.buffer_size, pb.bytes_written);
   110:   // fprintf(stderr,"wakeup of %p, closed = %i\n", this, connection_closed);
   111: 
   112:   // wake up: time to process some data
   113:   if(connection_closed || pb.bytes_written == pb.buffer_size)
   114:   {
   115:     // fprintf(stderr,"schedding %p, drv: %p, f: %p\n", this, drv, f);
   116:     request->notify_finished();
   117:     return;
   118:   }
   119: 
   120:   // fprintf(stderr,"not schedding %p\n", this);
   121:   if(demux.add_socket_wakeup(this, sio_flags) == -1)
   122:   fprintf(stderr,"failed to re-add_socket_wakeup\n");
   123: }
   124: 
   125: // asynchronous connect
   126: bool
   127: connect_request::start_async_op_impl()
   128: {
   129:   //fprintf(stderr,"connect_request %p: start async_op_impl\n",this);
   130: 
   131:   // call failed or finished (!), wake up thread as no wakeup coming
   132:   if(start(*pd) == -1) {
   133:     fprintf(stderr, "FAILED TO SPAWN CONNECT REQUEST\n");
   134:     return true;
   135:   }
   136: 
   137:   // NONONONONO! Referring to this's variables after a successful start
   138:   // gives rise to a race condition, which is bad.
   139:   //fprintf(stderr, "CONNECT REQUEST SPAWNED\n");
   140:   return false;     // do not reschedule after a successful start
   141: 
   142: /*
   143:   // I've not seen this yet, don't know why.
   144:   if(0 == socket_err) fprintf(stderr, "WOW, instant CONNECT\n");
   145: 
   146:   // call didn't fail, could be pending or finished.
   147:   // return socket_err != EINPROGRESS, the contrapositive, sort of
   148:   return 0 == socket_err;   // no err => finished immediately
   149: */
   150: }
   151: 
   152: void
   153: connect_request::wakeup(posix_demuxer& demux)
   154: {
   155:   //fprintf(stderr, "connect_request::wakeup\n");
   156: 
   157:   // fprintf(stderr,"connect woke up\n");
   158:   connect_control_block::wakeup(demux);
   159: 
   160:   // felix thread can pick out error itself.
   161:   notify_finished();
   162: }
   163: 
   164: 
   165: // async accept
   166: bool
   167: accept_request::start_async_op_impl()
   168: {
   169:   //fprintf(stderr,"accept_request %p: start async_op_impl\n",this);
   170:   bool failed = (start(*pd) == -1);      // accept_control_block function
   171:   if(failed)
   172:     fprintf(stderr, "FAILED TO SPAWN ACCEPT REQUEST\n");
   173:   //else
   174:   //  fprintf(stderr, "ACCEPT REQUEST SPAWNED\n");
   175:   return failed;
   176: }
   177: 
   178: void
   179: accept_request::wakeup(posix_demuxer& demux)
   180: {
   181:   // does the leg work.
   182:   accept_control_block::wakeup(demux);
   183: 
   184:   if(accepted == -1)
   185:   {
   186:     // I don't know if this is a good idea...
   187:     fprintf(stderr, "accept request failed (%i), retrying...\n",
   188:       socket_err);
   189:     // didn't get it - go back to sleep
   190:     if(start(demux) == -1)
   191:       fprintf(stderr, "failed again... probably was a bad idea\n");
   192:     return;
   193:   }
   194: 
   195:   notify_finished();
   196: }
   197: 
   198: // from driver request
   199: flxfileio_request::~flxfileio_request(){}
   200: flxfileio_request::flxfileio_request(){}
   201: 
   202: 
   203: bool
   204: flxfileio_request::start_async_op_impl()
   205: {
   206:   //fprintf(stderr,"flxfileio_request: start async_op_impl\n");
   207:   // printf("driver called fileio start_async_op code\n");
   208: 
   209:   // need to create the async io thing here, or ask the driver for it
   210:   // driver needs to go a little less portable
   211:   aio_worker->add_worker_task(this);
   212: 
   213:   return false;       // no wakeup
   214: }
   215: 
   216: void
   217: flxfileio_request::finished() { notify_finished(); }
   218: }}
   219: 
End cpp section to faio/faio_posixio.cpp[1]
Start felix section to lib/flx_faio_posix.flx[1 /1 ]
     1: #line 332 "./lpsrc/flx_posixio.ipk"
     2: #import <flx.flxh>
     3: // contains posix async socket io & copipes, all wrapped up streams
     4: 
     5: include "pthread";
     6: include "flx_faio";
     7: include "flx_demux";
     8: 
     9: module Faio_posix  {
    10: header faio_posixio_hpp = '#include "faio_posixio.hpp"';
    11: requires package "demux";
    12: requires package "faio";
    13: open C_hack;        // cast, address
    14: open Faio;
    15: open Pthread;
    16: open Demux;
    17: 
    18: header unistd_h = '#include <unistd.h>';            // close
    19: header fcntl_h = '#include <fcntl.h>';              // fcntl for O_NONBLOCK
    20: header sys_stat_h = '#include <fcntl.h>';              // for S_* permissions
    21: header sys_socket_h = '#include <sys/socket.h>';    // shutdown
    22: header sockety_h = '#include "demux_sockety.hpp"';  // my socket utils
    23: header = '#include "faio_posixio.hpp"';
    24: 
    25: 
    26: type fd_t = "int";
    27: fun invalid: fd_t -> bool="$1==-1";
    28: 
    29: instance Str[fd_t] {
    30:   fun str: fd_t -> string = "flx::rtl::strutil::str<int>($1)" requires flx_strutil;
    31: }
    32: 
    33: instance Str[socket_t] {
    34:   fun str: socket_t -> string = "flx::rtl::strutil::str<int>($1)" requires flx_strutil;
    35: }
    36: 
    37: proc close: socket_t = 'close($1);' requires unistd_h;
    38: proc close: fd_t = 'close($1);' requires unistd_h;
    39: 
    40: proc shutdown: socket_t*int = 'shutdown($a);' requires sys_socket_h;
    41: 
    42: fun bad_socket : socket_t -> bool = "$1 == -1";
    43: 
    44: type posix_permissions = "mode_t" requires sys_stat_h;
    45: const S_IRUSR : posix_permissions;
    46: const S_IWUSR : posix_permissions;
    47: const S_IXUSR : posix_permissions;
    48: const S_IRGRP : posix_permissions;
    49: const S_IWGRP : posix_permissions;
    50: const S_IXGRP : posix_permissions;
    51: const S_IROTH : posix_permissions;
    52: const S_IWOTH : posix_permissions;
    53: const S_IXOTH : posix_permissions;
    54: 
    55: // non blocking
    56: /*
    57: gen aio_ropen: string -> fd_t = 'open($1.c_str(), O_RDONLY | O_NONBLOCK)'
    58:     requires fcntl_h, sys_stat_h;
    59: gen aio_wopen: string -> fd_t = ' open($1.c_str(), O_WRONLY | O_NONBLOCK | O_CREAT | O_TRUNC, S_IRUSR|S_IWUSR)'
    60:     requires fcntl_h, sys_stat_h;
    61: gen aio_rwopen: string -> fd_t = ' open($1.c_str(), O_RDWR | O_NONBLOCK | O_CREAT | O_TRUNC, S_IRUSR|S_IWUSR)'
    62:     requires fcntl_h, sys_stat_h;
    63: gen aio_creat: string * posix_permissions-> fd_t = ' open($1.c_str(), O_RDWR | O_NONBLOCK | O_CREAT | O_TRUNC, $2)'
    64:     requires fcntl_h, sys_stat_h;
    65: */
    66: 
    67: 
    68: // blocking
    69: gen ropen: string -> fd_t = 'open($1.data(), O_RDONLY,0)' requires fcntl_h, sys_stat_h;
    70: gen wopen: string -> fd_t = 'open($1.data(), O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR)' requires fcntl_h, sys_stat_h;
    71: gen rwopen: string -> fd_t = 'open($1.data(), O_RDWR,0)' requires fcntl_h, sys_stat_h;
    72: gen creat: string * posix_permissions -> fd_t = 'open($1.data(), O_WRONLY | O_CREAT | O_TRUNC, $2)' requires fcntl_h, sys_stat_h;
    73: 
    74: fun access: string -> posix_permissions = "get_perm($1.data())"
    75:   requires body """
    76:   mode_t get_perm(char const *f)
    77:   {
    78:     struct stat b;
    79:     stat(f,&b);
    80:     return b.st_mode;
    81:   }
    82:   """
    83: ;
    84: 
    85: fun access: fd_t -> posix_permissions = "get_perm($1)"
    86:   requires body """
    87:   mode_t get_perm(int f)
    88:   {
    89:     struct stat b;
    90:     fstat(f,&b);
    91:     return b.st_mode;
    92:   }
    93:   """
    94: ;
    95: 
    96: type socket_t = "int";
    97: // socketio_request should be renamed to be async_fd_request
    98: type socketio_request = "flx::faio::socketio_request";
    99: 
   100: gen mk_socketio_request: demuxer * socket_t*address*int*bool -> socketio_request
   101:     = 'flx::faio::socketio_request($1, $2, (char*)$3, $4, $5)';
   102: 
   103: fun get_pb: socketio_request -> sel_param_ptr = '&$1.sv.pb';
   104: 
   105: // read & write differ only by a flag
   106: proc async_rw(fd: socket_t, len: &int, buf: address, eof: &bool, read_flag: bool)
   107: {
   108:     var asyncb = mk_socketio_request(sys_demux,fd, buf, *len, read_flag);
   109:     faio_req$ &asyncb;
   110:     calc_eof(asyncb.pb, len, eof);
   111: }
   112: 
   113: proc async_read(fd: socket_t, len: &int, buf: address,
   114:     eof: &bool)
   115: {
   116:     async_rw(fd, len, buf, eof, true);      // read
   117: }
   118: 
   119: proc async_write(fd: socket_t, len: &int, buf: address, eof: &bool)
   120: {
   121:     async_rw(fd, len, buf, eof, false);     // write
   122: }
   123: 
   124: type flxfileio_request = "flx::faio::flxfileio_request";
   125: 
   126: // connect!
   127: type async_connect = 'flx::faio::connect_request';
   128: 
   129: fun mk_async_connect: demuxer * charp*int-> async_connect = 'flx::faio::connect_request($a)';
   130: fun get_socket: async_connect -> socket_t = '$1.s';
   131: fun get_err: async_connect -> int = '$1.socket_err';
   132: 
   133: // could do multi connects for capable drivers
   134: proc connect(s: &socket_t, addr: charp, port: int)
   135: {
   136:     var ac = mk_async_connect(sys_demux,addr, port);
   137:     faio_req$ &ac;
   138:     *s = ac.socket;
   139: }
   140: 
   141: type accept_request = "flx::faio::accept_request";
   142: 
   143: fun mk_accept: demuxer * socket_t -> accept_request = 'flx::faio::accept_request($1,$2)';
   144: fun get_socket: accept_request -> int = '$1.accepted';
   145: 
   146: // arg1 = returned socket, arg2 is port, pass 0 to have one assigned
   147: proc cmk_listener: lvalue[socket_t]*lvalue[int]*int
   148:     = '$1 = flx::demux::create_async_listener(&$2, $3);' requires sockety_h;
   149: 
   150: proc mk_listener(s: &socket_t, port:&int, backlog:int)
   151: {
   152:     cmk_listener(*s,*port, backlog);
   153: }
   154: 
   155: proc accept(s: &socket_t, listener: socket_t)
   156: {
   157:     var acc = mk_accept$ sys_demux,listener;
   158:     faio_req$ &acc;
   159:     *s = acc.socket;
   160: }
   161: 
   162: // ASYNC FILE IO
   163: 
   164: // offset ? let it be for a moment
   165: fun mk_faio: job_queue * fd_t*address*int*int*bool -> flxfileio_request
   166:     = 'flx::faio::flxfileio_request($1,$2, (char*)$3, $4, $5, $6)';
   167: fun get_pb: flxfileio_request -> sel_param_ptr = '&$1.pb';
   168: 
   169: proc faio_rw(q:job_queue, fd: fd_t, len: &int, buf: address, eof: &bool, read_flag: bool)
   170: {
   171:     // constant offset for now, rushing to get this in flx_stream
   172:     var faio = mk_faio(q, fd, buf, *len, 0, read_flag);
   173:     faio_req$ &faio;
   174:     //print$ f"faio_rw: request %d, actual %d\n" (*len,faio.pb.bytes_done);
   175:     calc_eof(faio.pb, len, eof);
   176: }
   177: 
   178: // HACKERY -- system job queue
   179: val qbound = 20;
   180: val nthreads = 4;
   181: val sys_job_queue = Pthread::mk_job_queue(qbound,nthreads);
   182: 
   183: proc faio_read(fd: fd_t, len: &int, buf: address,
   184:     eof: &bool)
   185: {
   186:     faio_rw(sys_job_queue, fd, len, buf, eof, true);       // read
   187: }
   188: 
   189: proc faio_write(fd: fd_t, len: &int, buf: address, eof: &bool)
   190: {
   191:     faio_rw(sys_job_queue, fd, len, buf, eof, false);      // write
   192: }
   193: 
   194: 
   195: 
   196: } // module faio_posix
   197: 
   198: 
End felix section to lib/flx_faio_posix.flx[1]