3.1. Windows Emulation of Posix Synchronisation primitives

Start cpp section to pthread/pthread_win_posix_condv_emul.hpp[1 /1 ]
     1: #line 471 "./lpsrc/flx_pthread.pak"
     2: #ifndef __WIN_POSIX_CONDV_EMUL__
     3: #define __WIN_POSIX_CONDV_EMUL__
     4: // Note: no namespaces here!
     5: // See http://www.cs.wustl.edu/~schmidt/win32-cv-1.html
     6: 
     7: #include "flx_pthread_config.hpp"
     8: #ifdef _WIN32
     9: #include <windows.h>
    10: 
    11: typedef HANDLE pthread_mutex_t;
    12: typedef void pthread_mutexattr_t; // do NOT use them!
    13: typedef void pthread_condattr_t; // do NOT use them!
    14: 
    15: struct pthread_cond_t
    16: {
    17:   int waiters_count_;
    18:   // Number of waiting threads.
    19: 
    20:   CRITICAL_SECTION waiters_count_lock_;
    21:   // Serialize access to <waiters_count_>.
    22: 
    23:   HANDLE sema_;
    24:   // Semaphore used to queue up threads waiting for the condition to
    25:   // become signaled.
    26: 
    27:   HANDLE waiters_done_;
    28:   // An auto-reset event used by the broadcast/signal thread to wait
    29:   // for all the waiting thread(s) to wake up and be released from the
    30:   // semaphore.
    31: 
    32:   size_t was_broadcast_;
    33:   // Keeps track of whether we were broadcasting or signaling.  This
    34:   // allows us to optimize the code if we're just signaling.
    35: };
    36: 
    37: // THIS IS SICK but there ain't no other way in C
    38: #define ETIMEDOUT WAIT_TIMEOUT
    39: // looks like EAGAIN is available in minggw, but not in vs sdk.
    40: #ifndef EAGAIN
    41: #define EAGAIN WAIT_TIMEOUT
    42: #endif
    43: 
    44: int PTHREAD_EXTERN pthread_mutex_init (pthread_mutex_t*, const pthread_mutexattr_t*);
    45: int PTHREAD_EXTERN pthread_mutex_lock(pthread_mutex_t*);
    46: int PTHREAD_EXTERN pthread_mutex_unlock(pthread_mutex_t*);
    47: int PTHREAD_EXTERN pthread_mutex_destroy(pthread_mutex_t*);
    48: 
    49: int PTHREAD_EXTERN pthread_cond_init (pthread_cond_t*, const pthread_condattr_t*);
    50: int PTHREAD_EXTERN pthread_cond_destroy(pthread_cond_t*);
    51: int PTHREAD_EXTERN pthread_cond_wait (pthread_cond_t*, pthread_mutex_t*);
    52: int PTHREAD_EXTERN pthread_cond_timedwait(pthread_cond_t*, pthread_mutex_t*, struct timespec const*);
    53: int PTHREAD_EXTERN pthread_cond_uswait(pthread_cond_t*, pthread_mutex_t*, unsigned long us);
    54: int PTHREAD_EXTERN pthread_cond_signal (pthread_cond_t*);
    55: int PTHREAD_EXTERN pthread_cond_broadcast (pthread_cond_t*);
    56: 
    57: 
    58: typedef HANDLE sem_t;
    59: 
    60: int PTHREAD_EXTERN sem_init(sem_t *sem, int pshared, unsigned int value);
    61: int PTHREAD_EXTERN sem_wait(sem_t * sem);
    62: int PTHREAD_EXTERN sem_trywait(sem_t * sem);
    63: int PTHREAD_EXTERN sem_post(sem_t * sem);
    64: int PTHREAD_EXTERN sem_getvalue(sem_t * sem, int * sval);
    65: int PTHREAD_EXTERN sem_destroy(sem_t * sem);
    66: 
    67: #else
    68: #include <errno.h>
    69: #include <pthread.h>
    70: #include <semaphore.h>
    71: // emulate the native Window functionality
    72: int PTHREAD_EXTERN pthread_cond_uswait( pthread_cond_t*, pthread_mutex_t*, unsigned long us);
    73: #endif
    74: #endif
    75: 
End cpp section to pthread/pthread_win_posix_condv_emul.hpp[1]
Start cpp section to pthread/pthread_win_posix_condv_emul.cpp[1 /1 ]
     1: #line 547 "./lpsrc/flx_pthread.pak"
     2: #include "pthread_win_posix_condv_emul.hpp"
     3: #include <assert.h>
     4: #ifdef _WIN32
     5: #include <string.h>
     6: 
     7: struct timespec {
     8:   unsigned long tv_sec;
     9:   unsigned long tv_nsec;
    10: };
    11: 
    12: int pthread_mutex_init (pthread_mutex_t *m, const pthread_mutexattr_t*)
    13: {
    14:   *m = CreateMutex(NULL,FALSE,NULL);
    15:   return 0;
    16: }
    17: 
    18: int pthread_mutex_lock(pthread_mutex_t *m)
    19: {
    20:   WaitForSingleObject(*m,INFINITE);
    21:   return 0;
    22: }
    23: 
    24: int pthread_mutex_unlock(pthread_mutex_t *m)
    25: {
    26:   ReleaseMutex(*m);
    27:   return 0;
    28: }
    29: 
    30: int pthread_mutex_destroy(pthread_mutex_t *m)
    31: {
    32:   CloseHandle(*m);
    33:   return 0;
    34: }
    35: 
    36: int
    37: pthread_cond_init
    38: (
    39:   pthread_cond_t *cv,
    40:   const pthread_condattr_t *
    41: )
    42: {
    43:   cv->waiters_count_ = 0;
    44:   cv->was_broadcast_ = 0;
    45:   cv->sema_ = CreateSemaphore
    46:   (
    47:     NULL,       // no security
    48:     0,          // initially 0
    49:     0x7fffffff, // max count .. (I hate limits .. but thats a lot of pthreads)
    50:     NULL        // unnamed
    51:   );
    52:   InitializeCriticalSection (&cv->waiters_count_lock_);
    53:   cv->waiters_done_ = CreateEvent
    54:   (
    55:     NULL,  // no security
    56:     FALSE, // auto-reset
    57:     FALSE, // non-signaled initially
    58:     NULL   // unnamed
    59:   );
    60:   return 0;
    61: }
    62: 
    63: int
    64: pthread_cond_destroy(pthread_cond_t *cv)
    65: {
    66:   CloseHandle(cv->sema_);
    67:   CloseHandle(cv->waiters_done_);
    68:   return 0;
    69: }
    70: 
    71: // returns ETIMEDOUT = WAIT_TIMEOUT
    72: static int
    73: private_cond_wait
    74: (
    75:   pthread_cond_t *cv,
    76:   pthread_mutex_t *external_mutex,
    77:   unsigned long ms
    78: )
    79: {
    80:   // Avoid race conditions.
    81:   EnterCriticalSection (&cv->waiters_count_lock_);
    82:   cv->waiters_count_++;
    83:   LeaveCriticalSection (&cv->waiters_count_lock_);
    84: 
    85:   // This call atomically releases the mutex and waits on the
    86:   // semaphore until <pthread_cond_signal> or <pthread_cond_broadcast>
    87:   // are called by another thread.
    88:   int res = SignalObjectAndWait (*external_mutex, cv->sema_, ms, FALSE);
    89: 
    90:   // Reacquire lock to avoid race conditions.
    91:   EnterCriticalSection (&cv->waiters_count_lock_);
    92: 
    93:   // We're no longer waiting...
    94:   cv->waiters_count_--;
    95: 
    96:   // Check to see if we're the last waiter after <pthread_cond_broadcast>.
    97:   int last_waiter = cv->was_broadcast_ && cv->waiters_count_ == 0;
    98: 
    99:   LeaveCriticalSection (&cv->waiters_count_lock_);
   100: 
   101:   // If we're the last waiter thread during this particular broadcast
   102:   // then let all the other threads proceed.
   103:   if (last_waiter)
   104:     // This call atomically signals the <waiters_done_> event and waits until
   105:     // it can acquire the <external_mutex>.  This is required to ensure fairness.
   106:     SignalObjectAndWait (cv->waiters_done_, *external_mutex, INFINITE, FALSE);
   107:   else
   108:     // Always regain the external mutex since that's the guarantee we
   109:     // give to our callers.
   110:     WaitForSingleObject (*external_mutex, INFINITE);
   111:   return res;
   112: }
   113: 
   114: int
   115: pthread_cond_wait
   116: (
   117:   pthread_cond_t *cv,
   118:   pthread_mutex_t *external_mutex
   119: )
   120: {
   121:   return private_cond_wait(cv,external_mutex,INFINITE);
   122: }
   123: 
   124: // Posix is a pain in the butt here
   125: // we have to get the current time and subtract it
   126: // from the target time to get a duration
   127: // the pain is that we probably wanted a duration
   128: // and had to construct a target time by adding it
   129: // to the current time
   130: //
   131: // to fix this we add the native Windows mode (a duration)
   132: // to posix
   133: 
   134: int
   135: pthread_cond_uswait
   136: (
   137:   pthread_cond_t *cv,
   138:   pthread_mutex_t *external_mutex,
   139:   unsigned long us
   140: )
   141: {
   142: 
   143:   // Windows waits in ms, ours in us
   144:   return private_cond_wait(cv,external_mutex,us * 1000);
   145: }
   146: 
   147: int
   148: pthread_cond_timedwait
   149: (
   150:   pthread_cond_t *cv,
   151:   pthread_mutex_t *external_mutex,
   152:   struct timespec const *abstime
   153: )
   154: {
   155:   unsigned long t1 = abstime->tv_sec * 1000 + abstime->tv_nsec / 1000;
   156:   SYSTEMTIME tod;
   157:   GetSystemTime(&tod);
   158:   FILETIME ft;
   159:   SystemTimeToFileTime(&tod,&ft);
   160:   ULARGE_INTEGER now;  // so we can do some maths
   161:   assert(sizeof(now) == sizeof(ft));
   162:   memcpy(&now, &ft, sizeof(now));
   163:   unsigned long t0 = now.QuadPart / 10; // us now
   164:   unsigned long timeout = t1>t0 ? t1 - t0 : 0;
   165:   return private_cond_wait(cv,external_mutex,timeout);
   166: }
   167: 
   168: int
   169: pthread_cond_signal (pthread_cond_t *cv)
   170: {
   171:   EnterCriticalSection (&cv->waiters_count_lock_);
   172:   int have_waiters = cv->waiters_count_ > 0;
   173:   LeaveCriticalSection (&cv->waiters_count_lock_);
   174: 
   175:   // If there aren't any waiters, then this is a no-op.
   176:   if (have_waiters)
   177:     ReleaseSemaphore (cv->sema_, 1, 0);
   178:   return 0;
   179: }
   180: 
   181: int
   182: pthread_cond_broadcast (pthread_cond_t *cv)
   183: {
   184:   // This is needed to ensure that <waiters_count_> and <was_broadcast_> are
   185:   // consistent relative to each other.
   186:   EnterCriticalSection (&cv->waiters_count_lock_);
   187:   int have_waiters = 0;
   188: 
   189:   if (cv->waiters_count_ > 0) {
   190:     // We are broadcasting, even if there is just one waiter...
   191:     // Record that we are broadcasting, which helps optimize
   192:     // <pthread_cond_wait> for the non-broadcast case.
   193:     cv->was_broadcast_ = 1;
   194:     have_waiters = 1;
   195:   }
   196: 
   197:   if (have_waiters) {
   198:     // Wake up all the waiters atomically.
   199:     ReleaseSemaphore (cv->sema_, cv->waiters_count_, 0);
   200: 
   201:     LeaveCriticalSection (&cv->waiters_count_lock_);
   202: 
   203:     // Wait for all the awakened threads to acquire the counting
   204:     // semaphore.
   205:     WaitForSingleObject (cv->waiters_done_, INFINITE);
   206:     // This assignment is okay, even without the <waiters_count_lock_> held
   207:     // because no other waiter threads can wake up to access it.
   208:     cv->was_broadcast_ = 0;
   209:   }
   210:   else
   211:     LeaveCriticalSection (&cv->waiters_count_lock_);
   212:   return 0;
   213: }
   214: 
   215: int sem_init(sem_t *sem, int pshared, unsigned int value)
   216: {
   217:   *sem = CreateSemaphore(NULL,value,0x7FFFFFFF,NULL);
   218:   return 0;
   219: }
   220: 
   221: int sem_wait(sem_t * sem)
   222: {
   223:   return WaitForSingleObject(*sem,INFINITE);
   224: }
   225: 
   226: int sem_trywait(sem_t * sem)
   227: {
   228:   return WaitForSingleObject(*sem,0);
   229: }
   230: 
   231: int sem_post(sem_t * sem)
   232: {
   233:   return ReleaseSemaphore(*sem,1,NULL);
   234: }
   235: 
   236: int sem_getvalue(sem_t * sem, int * sval)
   237: {
   238:   LONG x;
   239:   ReleaseSemaphore(*sem,0,&x);
   240:   *sval = x;
   241:   return 0;
   242: }
   243: 
   244: int sem_destroy(sem_t * sem)
   245: {
   246:   return CloseHandle(*sem);
   247: }
   248: 
   249: 
   250: #else
   251: 
   252: //POSIX
   253: #include <time.h>
   254: #include <sys/time.h>
   255: 
   256: int pthread_cond_uswait(
   257:   pthread_cond_t *cv,
   258:   pthread_mutex_t *m,
   259:   unsigned long us
   260: )
   261: {
   262:   timeval tv;
   263:   gettimeofday(&tv,NULL);
   264:   unsigned long t0 = tv.tv_sec * 1000000uL + tv.tv_usec;
   265:   unsigned long t1 = t0 + us;
   266:   timespec ts;
   267:   ts.tv_sec = t1 / 1000000uL;
   268:   ts.tv_nsec = (t1 % 1000000uL) * 1000;
   269:   return pthread_cond_timedwait(cv,m,&ts);
   270: }
   271: 
   272: #endif
   273: 
End cpp section to pthread/pthread_win_posix_condv_emul.cpp[1]