3.1. Windows Emulation of Posix Synchronisation primitives

Start cpp section to pthread/pthread_win_posix_condv_emul.hpp[1 /1 ]
     1: #line 432 "./lpsrc/flx_pthread.pak"
     2: #ifndef __FLX_PTHREAD_WIN_POSIX_CONDV_EMUL_H__
     3: #define __FLX_PTHREAD_WIN_POSIX_CONDV_EMUL_H__
     4: // Note: no namespaces here!
     5: // See http://www.cs.wustl.edu/~schmidt/win32-cv-1.html
     6: 
     7: #include "flx_pthread_config.hpp"
     8: #if FLX_WIN32
     9: #include <windows.h>
    10: 
    11: typedef HANDLE pthread_mutex_t;
    12: typedef void pthread_mutexattr_t; // do NOT use them!
    13: typedef void pthread_condattr_t; // do NOT use them!
    14: 
    15: struct pthread_cond_t
    16: {
    17:   int waiters_count_;
    18:   // Number of waiting threads.
    19: 
    20:   CRITICAL_SECTION waiters_count_lock_;
    21:   // Serialize access to <waiters_count_>.
    22: 
    23:   HANDLE sema_;
    24:   // Semaphore used to queue up threads waiting for the condition to
    25:   // become signaled.
    26: 
    27:   HANDLE waiters_done_;
    28:   // An auto-reset event used by the broadcast/signal thread to wait
    29:   // for all the waiting thread(s) to wake up and be released from the
    30:   // semaphore.
    31: 
    32:   size_t was_broadcast_;
    33:   // Keeps track of whether we were broadcasting or signaling.  This
    34:   // allows us to optimize the code if we're just signaling.
    35: };
    36: 
    37: // THIS IS SICK but there ain't no other way in C
    38: #define ETIMEDOUT WAIT_TIMEOUT
    39: // looks like EAGAIN is available in minggw, but not in vs sdk.
    40: #ifndef EAGAIN
    41: #define EAGAIN WAIT_TIMEOUT
    42: #endif
    43: 
    44: int PTHREAD_EXTERN pthread_mutex_init (pthread_mutex_t*, const pthread_mutexattr_t*);
    45: int PTHREAD_EXTERN pthread_mutex_lock(pthread_mutex_t*);
    46: int PTHREAD_EXTERN pthread_mutex_unlock(pthread_mutex_t*);
    47: int PTHREAD_EXTERN pthread_mutex_destroy(pthread_mutex_t*);
    48: 
    49: int PTHREAD_EXTERN pthread_cond_init (pthread_cond_t*, const pthread_condattr_t*);
    50: int PTHREAD_EXTERN pthread_cond_destroy(pthread_cond_t*);
    51: int PTHREAD_EXTERN pthread_cond_wait (pthread_cond_t*, pthread_mutex_t*);
    52: int PTHREAD_EXTERN pthread_cond_timedwait(pthread_cond_t*, pthread_mutex_t*, struct timespec const*);
    53: int PTHREAD_EXTERN pthread_cond_uswait(pthread_cond_t*, pthread_mutex_t*, unsigned long us);
    54: int PTHREAD_EXTERN pthread_cond_signal (pthread_cond_t*);
    55: int PTHREAD_EXTERN pthread_cond_broadcast (pthread_cond_t*);
    56: 
    57: 
    58: typedef HANDLE sem_t;
    59: 
    60: int PTHREAD_EXTERN sem_init(sem_t *sem, int pshared, unsigned int value);
    61: int PTHREAD_EXTERN sem_wait(sem_t * sem);
    62: int PTHREAD_EXTERN sem_trywait(sem_t * sem);
    63: int PTHREAD_EXTERN sem_post(sem_t * sem);
    64: int PTHREAD_EXTERN sem_getvalue(sem_t * sem, int * sval);
    65: int PTHREAD_EXTERN sem_destroy(sem_t * sem);
    66: 
    67: #else
    68: #include <errno.h>
    69: #include <pthread.h>
    70: #include <semaphore.h>
    71: // emulate the native Window functionality
    72: int PTHREAD_EXTERN pthread_cond_uswait( pthread_cond_t*, pthread_mutex_t*, unsigned long us);
    73: #endif
    74: #endif
    75: 
End cpp section to pthread/pthread_win_posix_condv_emul.hpp[1]
Start cpp section to pthread/pthread_win_posix_condv_emul.cpp[1 /1 ]
     1: #line 508 "./lpsrc/flx_pthread.pak"
     2: #include "pthread_win_posix_condv_emul.hpp"
     3: #include <assert.h>
     4: #if FLX_WIN32
     5: #include <string.h>
     6: 
     7: struct timespec {
     8:   unsigned long tv_sec;
     9:   unsigned long tv_nsec;
    10: };
    11: 
    12: // RF: unlike real pthread mutexes, windows mutexes are always recursive.
    13: // that's annoying because I use deadlock as a debugging tool. the upshot
    14: // is that recursively acquiring a mutex gives undefined results.
    15: int pthread_mutex_init (pthread_mutex_t *m, const pthread_mutexattr_t*)
    16: {
    17:   *m = CreateMutex(NULL,FALSE,NULL);
    18:   return 0;
    19: }
    20: 
    21: int pthread_mutex_lock(pthread_mutex_t *m)
    22: {
    23:   WaitForSingleObject(*m,INFINITE);
    24:   return 0;
    25: }
    26: 
    27: int pthread_mutex_unlock(pthread_mutex_t *m)
    28: {
    29:   ReleaseMutex(*m);
    30:   return 0;
    31: }
    32: 
    33: int pthread_mutex_destroy(pthread_mutex_t *m)
    34: {
    35:   CloseHandle(*m);
    36:   return 0;
    37: }
    38: 
    39: int
    40: pthread_cond_init
    41: (
    42:   pthread_cond_t *cv,
    43:   const pthread_condattr_t *
    44: )
    45: {
    46:   cv->waiters_count_ = 0;
    47:   cv->was_broadcast_ = 0;
    48:   cv->sema_ = CreateSemaphore
    49:   (
    50:     NULL,       // no security
    51:     0,          // initially 0
    52:     0x7fffffff, // max count .. (I hate limits .. but thats a lot of pthreads)
    53:     NULL        // unnamed
    54:   );
    55:   InitializeCriticalSection (&cv->waiters_count_lock_);
    56:   cv->waiters_done_ = CreateEvent
    57:   (
    58:     NULL,  // no security
    59:     FALSE, // auto-reset
    60:     FALSE, // non-signaled initially
    61:     NULL   // unnamed
    62:   );
    63:   return 0;
    64: }
    65: 
    66: int
    67: pthread_cond_destroy(pthread_cond_t *cv)
    68: {
    69:   CloseHandle(cv->sema_);
    70:   CloseHandle(cv->waiters_done_);
    71:   return 0;
    72: }
    73: 
    74: // returns ETIMEDOUT = WAIT_TIMEOUT
    75: static int
    76: private_cond_wait
    77: (
    78:   pthread_cond_t *cv,
    79:   pthread_mutex_t *external_mutex,
    80:   unsigned long ms
    81: )
    82: {
    83:   // Avoid race conditions.
    84:   EnterCriticalSection (&cv->waiters_count_lock_);
    85:   cv->waiters_count_++;
    86:   LeaveCriticalSection (&cv->waiters_count_lock_);
    87: 
    88:   // This call atomically releases the mutex and waits on the
    89:   // semaphore until <pthread_cond_signal> or <pthread_cond_broadcast>
    90:   // are called by another thread.
    91:   int res = SignalObjectAndWait (*external_mutex, cv->sema_, ms, FALSE);
    92: 
    93:   // Reacquire lock to avoid race conditions.
    94:   EnterCriticalSection (&cv->waiters_count_lock_);
    95: 
    96:   // We're no longer waiting...
    97:   cv->waiters_count_--;
    98: 
    99:   // Check to see if we're the last waiter after <pthread_cond_broadcast>.
   100:   int last_waiter = cv->was_broadcast_ && cv->waiters_count_ == 0;
   101: 
   102:   LeaveCriticalSection (&cv->waiters_count_lock_);
   103: 
   104:   // If we're the last waiter thread during this particular broadcast
   105:   // then let all the other threads proceed.
   106:   if (last_waiter)
   107:     // This call atomically signals the <waiters_done_> event and waits until
   108:     // it can acquire the <external_mutex>.  This is required to ensure fairness.
   109:     SignalObjectAndWait (cv->waiters_done_, *external_mutex, INFINITE, FALSE);
   110:   else
   111:     // Always regain the external mutex since that's the guarantee we
   112:     // give to our callers.
   113:     WaitForSingleObject (*external_mutex, INFINITE);
   114:   return res;
   115: }
   116: 
   117: int
   118: pthread_cond_wait
   119: (
   120:   pthread_cond_t *cv,
   121:   pthread_mutex_t *external_mutex
   122: )
   123: {
   124:   return private_cond_wait(cv,external_mutex,INFINITE);
   125: }
   126: 
   127: // Posix is a pain in the butt here
   128: // we have to get the current time and subtract it
   129: // from the target time to get a duration
   130: // the pain is that we probably wanted a duration
   131: // and had to construct a target time by adding it
   132: // to the current time
   133: //
   134: // to fix this we add the native Windows mode (a duration)
   135: // to posix
   136: 
   137: int
   138: pthread_cond_uswait
   139: (
   140:   pthread_cond_t *cv,
   141:   pthread_mutex_t *external_mutex,
   142:   unsigned long us
   143: )
   144: {
   145: 
   146:   // Windows waits in ms, ours in us
   147:   return private_cond_wait(cv,external_mutex,us * 1000);
   148: }
   149: 
   150: int
   151: pthread_cond_timedwait
   152: (
   153:   pthread_cond_t *cv,
   154:   pthread_mutex_t *external_mutex,
   155:   struct timespec const *abstime
   156: )
   157: {
   158:   unsigned long t1 = abstime->tv_sec * 1000 + abstime->tv_nsec / 1000;
   159:   SYSTEMTIME tod;
   160:   GetSystemTime(&tod);
   161:   FILETIME ft;
   162:   SystemTimeToFileTime(&tod,&ft);
   163:   ULARGE_INTEGER now;  // so we can do some maths
   164:   assert(sizeof(now) == sizeof(ft));
   165:   memcpy(&now, &ft, sizeof(now));
   166:   unsigned long t0 = now.QuadPart / 10; // us now
   167:   unsigned long timeout = t1>t0 ? t1 - t0 : 0;
   168:   return private_cond_wait(cv,external_mutex,timeout);
   169: }
   170: 
   171: int
   172: pthread_cond_signal (pthread_cond_t *cv)
   173: {
   174:   EnterCriticalSection (&cv->waiters_count_lock_);
   175:   int have_waiters = cv->waiters_count_ > 0;
   176:   LeaveCriticalSection (&cv->waiters_count_lock_);
   177: 
   178:   // If there aren't any waiters, then this is a no-op.
   179:   if (have_waiters)
   180:     ReleaseSemaphore (cv->sema_, 1, 0);
   181:   return 0;
   182: }
   183: 
   184: int
   185: pthread_cond_broadcast (pthread_cond_t *cv)
   186: {
   187:   // This is needed to ensure that <waiters_count_> and <was_broadcast_> are
   188:   // consistent relative to each other.
   189:   EnterCriticalSection (&cv->waiters_count_lock_);
   190:   int have_waiters = 0;
   191: 
   192:   if (cv->waiters_count_ > 0) {
   193:     // We are broadcasting, even if there is just one waiter...
   194:     // Record that we are broadcasting, which helps optimize
   195:     // <pthread_cond_wait> for the non-broadcast case.
   196:     cv->was_broadcast_ = 1;
   197:     have_waiters = 1;
   198:   }
   199: 
   200:   if (have_waiters) {
   201:     // Wake up all the waiters atomically.
   202:     ReleaseSemaphore (cv->sema_, cv->waiters_count_, 0);
   203: 
   204:     LeaveCriticalSection (&cv->waiters_count_lock_);
   205: 
   206:     // Wait for all the awakened threads to acquire the counting
   207:     // semaphore.
   208:     WaitForSingleObject (cv->waiters_done_, INFINITE);
   209:     // This assignment is okay, even without the <waiters_count_lock_> held
   210:     // because no other waiter threads can wake up to access it.
   211:     cv->was_broadcast_ = 0;
   212:   }
   213:   else
   214:     LeaveCriticalSection (&cv->waiters_count_lock_);
   215:   return 0;
   216: }
   217: 
   218: int sem_init(sem_t *sem, int pshared, unsigned int value)
   219: {
   220:   *sem = CreateSemaphore(NULL,value,0x7FFFFFFF,NULL);
   221:   return 0;
   222: }
   223: 
   224: int sem_wait(sem_t * sem)
   225: {
   226:   return WaitForSingleObject(*sem,INFINITE);
   227: }
   228: 
   229: int sem_trywait(sem_t * sem)
   230: {
   231:   return WaitForSingleObject(*sem,0);
   232: }
   233: 
   234: int sem_post(sem_t * sem)
   235: {
   236:   return ReleaseSemaphore(*sem,1,NULL);
   237: }
   238: 
   239: int sem_getvalue(sem_t * sem, int * sval)
   240: {
   241:   LONG x;
   242:   ReleaseSemaphore(*sem,0,&x);
   243:   *sval = x;
   244:   return 0;
   245: }
   246: 
   247: int sem_destroy(sem_t * sem)
   248: {
   249:   return CloseHandle(*sem);
   250: }
   251: 
   252: 
   253: #else
   254: 
   255: //POSIX
   256: #include <time.h>
   257: #include <sys/time.h>
   258: 
   259: int pthread_cond_uswait(
   260:   pthread_cond_t *cv,
   261:   pthread_mutex_t *m,
   262:   unsigned long us
   263: )
   264: {
   265:   timeval tv;
   266:   gettimeofday(&tv,NULL);
   267:   unsigned long t0 = tv.tv_sec * 1000000uL + tv.tv_usec;
   268:   unsigned long t1 = t0 + us;
   269:   timespec ts;
   270:   ts.tv_sec = t1 / 1000000uL;
   271:   ts.tv_nsec = (t1 % 1000000uL) * 1000;
   272:   return pthread_cond_timedwait(cv,m,&ts);
   273: }
   274: 
   275: #endif
   276: 
End cpp section to pthread/pthread_win_posix_condv_emul.cpp[1]