Fast futex based condition variables

Fast futex based condition variables

Post by Joe Seig » Fri, 19 Nov 2004 01:08:25


This is a lock-free futex based condvar. It doesn't do
wait morphing. The speed up hack was simply placing a
sched_yield() after the futex wait. I'll post the code
in follow up posts for anyone who wants to experiment.
I'm not going to support it as its not clear it's worth it.
Stats on high contention condvar usage repored earler.
Here's some stats on a more conventional application.
Disk based filesystem i/o was used and which is buffered,
particularly the file read i/o. Network i/o would have
been a better test.

Simple producer/consumer copy of 18080 512 byte record file
with 40 buffers.

ru_nvcsw - voluntary context switches
ru_nivcsw - involuntary context switches
cvwait_r - # times producer thread did a cond_wait
cvwait_w - # times consumer thread did a cond_wait

NPTL condvars:
time = 1.359141
user time = 0.189971
system time = 1.165823
ru_nvcsw = 54063
ru_nivcsw = 54106
cvwait_r = 18021
cvwait_w = 0

fastcv w/o sched_yield:
time = 1.332471
user time = 0.199970
system time = 1.129828
ru_nvcsw = 36084
ru_nivcsw = 36133
cvwait_r = 18041
cvwait_w = 0

fastcv w/ sched_yield:
time = 0.572876
user time = 0.047993
system time = 0.514922
ru_nvcsw = 582
ru_nivcsw = 1694
cvwait_r = 561
cvwait_w = 6

NPTL semaphores:
time = 1.103147
user time = 0.095986
system time = 1.004847
ru_nvcsw = 17948
ru_nivcsw = 17990
cvwait_r = 0
cvwait_w = 0

NPTL semaphores w/ sched_yield after sem_wait:
time = 0.966496
user time = 0.098985
system time = 0.865868
ru_nvcsw = 460
ru_nivcsw = 37079
cvwait_r = 0
cvwait_w = 0

fast semaphores w/ sched_yield after sem_wait:
time = 0.555656
user time = 0.047993
system time = 0.505923
ru_nvcsw = 557
ru_nivcsw = 1664
cvwait_r = 0
cvwait_w = 0

Single thread read/write single buffer loop copy:
time = 0.559268
user time = 0.017998
system time = 0.539918
ru_nvcsw = 0
ru_nivcsw = 39
cvwait_r = 0
cvwait_w = 0

NPTL condvar w/ sched_yield after pthread_mutex_unlock:
time = 0.927346
user time = 0.099985
system time = 0.825875
ru_nvcsw = 27
ru_nivcsw = 38978
cvwait_r = 0
cvwait_w = 1
nreads = 18080


That last item is kind of strange. Combining it with
sched_yields in other places slows things down. I don't
know what is going on in that case.

Joe Seigh
 
 
 

Fast futex based condition variables

Post by Joe Seig » Fri, 19 Nov 2004 01:29:52

t's optimistically named fastcv. The files are fastcv.h fastcv.c. To enable
the sched_yield specify -D_FASTCV_YIELD on the compile of fastcv.c. You can
specifgy -include fastcv.h on the application compile to avoid any source code
changes. Library options are -lpthread and -lrt.

Joe Seigh

fastcv.h
--------------------------------------------
#ifndef _FASTCV_H
#define _FASTCV_H

#include <pthread.h>

typedef struct {
int waiters; // atomic
int eventcount; // broadcast events (futex)
} pthread_cond_x_t;

#define pthread_cond_t pthread_cond_x_t
#undef PTHREAD_COND_INITIALIZER
#define PTHREAD_COND_INITIALIZER {0, 0}
#define pthread_cond_wait(c, m) pthread_cond_timedwait_x(c, m, NULL)
#define pthread_cond_timedwait pthread_cond_timedwait_x
#define pthread_cond_signal pthread_cond_signal_x
#define pthread_cond_broadcast pthread_cond_broadcast_x
#define pthread_cond_init pthread_cond_init_x
#define pthread_cond_destroy pthread_cond_destroy_x

extern int pthread_cond_timedwait_x(pthread_cond_t *, pthread_mutex_t *, struct timespec *);
extern int pthread_cond_signal_x(pthread_cond_t *);
extern int pthread_cond_broadcast_x(pthread_cond_t *);
extern int pthread_cond_init_x(pthread_cond_t *, pthread_condattr_t *);
extern int pthread_cond_destroy_x(pthread_cond_t *);

#endif /* _FASTCV_H */

--------------------------------------------
fastcv.c
--------------------------------------------
#include <pthread.h>
#include <errno.h>
#include <limits.h>
#include <string.h>
#include <sys/syscall.h>
#include <linux/futex.h>
#include <sched.h>

#include <asm/atomic.h>

#define atomic_inc_x(x) atomic_inc((atomic_t *)(x))
#define atomic_add_negative_x(i, x) atomic_add_negative((i), (atomic_t *)(x))

#include "fastcv.h"

//---------------------------------------------------------------------
// futex syscall definitions
//
//---------------------------------------------------------------------
#define futex_wait(futex, val, ts) \
syscall(SYS_futex, (unsigned long)futex, FUTEX_WAIT, val, (unsigned long)ts, 0)

#define futex_wake(futex, nwake) \
syscall(SYS_futex, (unsigned long)futex, FUTEX_WAKE, nwake, 0, 0)

//---------------------------------------------------------------------
// cancelation cleanup handler buffer
//
//---------------------------------------------------------------------
struct cbuffer {
pthread_cond_t *cvar;
pthread_mutex_t *mutex;
};


//---------------------------------------------------------------------
// abstime_to_reltime --
//
// returns ETIMEDOUT if abstime has passed
//---------------------------------------------------------------------
int abstime_to_reltime(struct timespec *reltime, struct timespec *abstime) {
struct timespec now;
int borrow = 0;

clock_gettime(CLOCK_REALTIME, &now);

if ((reltime->tv_nsec = (abstime->tv_nsec - now.tv_nsec)) < 0) {
borrow = 1;
reltime->tv_nsec += 1000000000;
}

// <
if ((reltime->tv_sec = (abstime->tv_sec - now.tv_sec) - borrow) < 0)
return ETIMEDOUT;
// ==
else if (reltime->tv_sec == 0 && reltime->tv_nsec == 0)
return ETIMEDOUT;
// >
else
return 0;
}

//----------------
 
 
 

Fast futex based condition variables

Post by Joe Seig » Fri, 19 Nov 2004 03:04:29


That was a quick and dirty fast semaphore written with atomic
increment and decrement. You can write it with sem_trywait()
instead. You get a slight time increase to around .65.

Joe Seigh
 
 
 

Fast futex based condition variables

Post by Sender » Fri, 19 Nov 2004 10:44:44


Nice! I am looking forward to playing around with this code. It will teach
me more about these damn futexs, I don't quite understand them fully.
 
 
 

Fast futex based condition variables

Post by Joe Seig » Fri, 19 Nov 2004 11:31:43


It just uses futex_wait and futex_wake which are fairly straightforward. An
earlier version looped on EINTR if the evencount was unchanged. NPTL's
version can't loop because they use wait morphing and the waiting thread
can't know which futex it woke up on. futex_cmp_requeue is a little strange.
You sort of have to do a broadcast if the compare fails because you might have
lost a signal when the threads were temporarily dequeued. I don't know why
they didn't just hard code this into futex_cmp_requeue. It would be fairly
straightforward to add waitmorphing to my code. I just got tired of messing
with it.

Also the cancelation logic could be make simpler. I suspect that thread cancelation
is done with a built-in signal. If that was so, instead of setting up cancelation
handlers and enabling async cancelation, just call pthread_testcancel after reacquiring
the lock if futex_wait had exited with EINTR.

But enough fooling around with condvars. It's not my job, there are people who are
paid to do it and it's their job. I'll probably do a "zero overhead" eventcount
based on the futex since it is an eventcount of sorts.

Joe Seigh
 
 
 

Fast futex based condition variables

Post by Sender » Fri, 19 Nov 2004 18:23:17

> It just uses futex_wait and futex_wake which are fairly straightforward.

I was wondering how futex_wait atomically checks if the futex value is the
same as the expected value. It kind of seems like a cas operation.

Something like this:

int futex_wait( int *futex, int cmp )
{
hash_futex_lock( futex );

if ( atomic_cas( futex, cmp, cmp ) == cmp )
{
ret = queue_me_and_wait( futex );

} else { ret = EWOULDBLOCK; }

hash_futex_unlock( futex );
}


?
 
 
 

Fast futex based condition variables

Post by Joe Seig » Fri, 19 Nov 2004 20:57:21


No, it just has to queue (suspend) the waiting thread, fetch the futex
value, and dequeue (resume) the waiting thread if the futex value is
different than the cmp value.

There has to be a store-load barrier between the queue and futex fetch.

If dequeue failes because thread resumed by something else that's ok.

If thread has been dequeued and subsequently requeued before you attempted
the dequeue, that's ok even if the new cmp value has changed from the one
you are using. It's just a spurious wakeup.

There's currently a discussion about this on the kernel mailing list.

Joe
 
 
 

Fast futex based condition variables

Post by Sender » Fri, 19 Nov 2004 21:17:54

gt; It just uses futex_wait and futex_wake which are fairly straightforward.

Would the following pseudo-code implement a zero-overhead eventcount?


It should, hummm...





#define EC_WAITERS 1
#define EC_LOCKED 2
#define EC_LOCK_WAITERS 4





typedef struct ec_
{
volatile ULONG count;
HANDLE waitset; // auto-reset event
per_thread_t *front;
per_thread_t *back;

} ec_t;





/* Locks and increments event count */
void prv_ec_lock_inc( ec_t *_this )
{
register int waited = 0;
register LONG cmp, xchg, old = _this->count;

for ( ;; )
{
if ( ! ( old & EC_LOCKED ) )
{
/* lock and inc */
xchg = ( old | EC_LOCKED ) + 0x00000010;
}

else { xchg = old | EC_LOCK_WAITERS; }

if ( waited ) { xchg |= EC_LOCK_WAITERS; }

cmp = old;

old = InterlockedCompareExchange
( &_this->count,
xchg & ~EC_WAITERS,
cmp );

if ( cmp == old )
{
if ( ! ( old & EC_LOCKED ) ) { break; }

if ( WaitForSingleObject
( _this->waitset,
INFINITE ) != WAIT_OBJECT_0 )
{
abort();
}

waited = 1;
old = _this->count;
}
}
}


/* Compares count, and locks if equal */
BOOL prv_ec_lock_cmp( ec_t *_this, LONG cmp )
{
register int waited = 0;
register LONG cmp, xchg, old = _this->count;

for ( ;; )
{
/* compare event count */
if ( ( old & 0xFFFFFFF0 ) == cmp )
{
if ( ! ( old & EC_LOCKED ) )
{
/* lock */
xchg = old | EC_LOCKED;
}

else { xchg = old | EC_LOCK_WAITERS; }
}

else { xchg = old; }

if ( waited ) { xchg |= EC_LOCK_WAITERS; }

cmp = old;

old = InterlockedCompareExchange
( &_this->count,
xchg,
cmp );

if ( cmp == old )
{
if ( ( old & 0xFFFFFFF0 ) != cmp ) { break; }

else if ( ! ( old & EC_LOCKED ) ) { return TRUE; }

if ( WaitForSingleObject
( _this->waitset,
INFINITE ) != WAIT_OBJECT_0 )
{
abort();
}

waited = 1;
old = _this->count;
}
}

return FALSE;
}


/* unlocks */
void prv_ec_unlock( ec_t *_this )
{
register LONG cmp, old = _this->old;

do
{
cmp = old;

old = InterlockedCompareExchange
( &_this->count,
old & ~( EC_LOCKED | EC_LOCKED_WAITERS ),
cmp );

} while { cmp != old );

if ( old & EC_LOCKED_WAITERS )
{
if ( ! SetEvent( _this->waitset ) ) { abort(); }
}
}





LONG ec_get( ec_t *_this )
{
register LONG cmp, old = _this->co
 
 
 

Fast futex based condition variables

Post by Sender » Fri, 19 Nov 2004 21:19:36

gt; It just uses futex_wait and futex_wake which are fairly straightforward.

Would the following pseudo-code implement a zero-overhead eventcount?


It should, hummm...





#define EC_WAITERS 1
#define EC_LOCKED 2
#define EC_LOCK_WAITERS 4





typedef struct ec_
{
volatile ULONG count;
HANDLE waitset; // auto-reset event
per_thread_t *front;
per_thread_t *back;

} ec_t;





/* Locks and increments event count */
void prv_ec_lock_inc( ec_t *_this )
{
register int waited = 0;
register LONG cmp, xchg, old = _this->count;

for ( ;; )
{
if ( ! ( old & EC_LOCKED ) )
{
/* lock and inc */
xchg = ( old | EC_LOCKED ) + 0x00000010;
}

else { xchg = old | EC_LOCK_WAITERS; }

if ( waited ) { xchg |= EC_LOCK_WAITERS; }

cmp = old;

old = InterlockedCompareExchange
( &_this->count,
xchg & ~EC_WAITERS,
cmp );

if ( cmp == old )
{
if ( ! ( old & EC_LOCKED ) ) { break; }

if ( WaitForSingleObject
( _this->waitset,
INFINITE ) != WAIT_OBJECT_0 )
{
abort();
}

waited = 1;
old = _this->count;
}
}
}


/* Compares count, and locks if equal */
BOOL prv_ec_lock_cmp( ec_t *_this, LONG cmp )
{
register int waited = 0;
register LONG cmp, xchg, old = _this->count;

for ( ;; )
{
/* compare event count */
if ( ( old & 0xFFFFFFF0 ) == cmp )
{
if ( ! ( old & EC_LOCKED ) )
{
/* lock */
xchg = old | EC_LOCKED;
}

else { xchg = old | EC_LOCK_WAITERS; }
}

else { xchg = old; }

if ( waited ) { xchg |= EC_LOCK_WAITERS; }

cmp = old;

old = InterlockedCompareExchange
( &_this->count,
xchg,
cmp );

if ( cmp == old )
{
if ( ( old & 0xFFFFFFF0 ) != cmp ) { break; }

else if ( ! ( old & EC_LOCKED ) ) { return TRUE; }

if ( WaitForSingleObject
( _this->waitset,
INFINITE ) != WAIT_OBJECT_0 )
{
abort();
}

waited = 1;
old = _this->count;
}
}

return FALSE;
}


/* unlocks */
void prv_ec_unlock( ec_t *_this )
{
register LONG cmp, old = _this->old;

do
{
cmp = old;

old = InterlockedCompareExchange
( &_this->count,
old & ~( EC_LOCKED | EC_LOCKED_WAITERS ),
cmp );

} while { cmp != old );

if ( old & EC_LOCKED_WAITERS )
{
if ( ! SetEvent( _this->waitset ) ) { abort(); }
}
}





LONG ec_get( ec_t *_this )
{
register LONG cmp, old = _this->cou
 
 
 

Fast futex based condition variables

Post by Joe Seig » Fri, 19 Nov 2004 22:04:26


[...]

Potential for unnecessary spurious wakeups. They might not be a problem but
I wouldn't have all the resources to test and verify that.



Actually, futexes are basically lock-free eventcounts by themselves. There's
nothing really to add except a waiter count if you want to avoid signaling when
there's no waiters. But you may not want or need a waiters count if there's
other ways to determine if there's waiters. If the wait condition is an empty
queue or stack then you don't need to signal when you enqueue onto an non-empty
queue. Enqueueing onto an empty queue doesn't necessarily mean there are waiters
but you'd have to do some testing to see how much of a problem that actually is.

Joe Seigh
 
 
 

Fast futex based condition variables

Post by Joe Seig » Sat, 20 Nov 2004 10:05:15


[...]

Maybe they do have a problem.

http://www.yqcomputer.com/

Seems you can wake a thread that was already "woken" up by the cmp value
before it had a chance to do the unqueue_me.

I think you can fix it by waking up another waiting thread if the unqueue_me
fails. Eventually you wake a thread that wasn't already "woken".

Joe Seigh
 
 
 

Fast futex based condition variables

Post by David Schw » Sat, 20 Nov 2004 10:55:58


If there are at least two threads blocking on pthread_cond_wait, and you
make two calls to pthread_cond_signal, it is not acceptable to wake the same
thread twice (assuming it doesn't come back around and call
pthread_cond_wait again).

DS
 
 
 

Fast futex based condition variables

Post by Sender » Sat, 20 Nov 2004 14:26:22


Using the eventcount algorithm I posted, you could probably do win32 convar
waits like this:


typedef ec_t cv_t;


void cv_wait( cv_t *_this, LPCRITICAL_SECTION mutex )
{
LONG cmp = ec_get( _this );

LeaveCriticalSection( mutex );

ec_wait( _this, cmp );

EnterCriticalSection( mutex );
}


Humm, looks to simple. But it should work fine...
 
 
 

Fast futex based condition variables

Post by Sender » Sat, 20 Nov 2004 22:59:36

> Maybe they do have a problem.

That has to because they queue the thread before they check the cmp value?
 
 
 

Fast futex based condition variables

Post by Joe Seig » Sat, 20 Nov 2004 23:34:27


Yeah, basically. They're going to have to go back to the ealier futex implementation
where they held the queue lock when they did the compare first before enqeueing.

Joe Seigh