@ -104,17 +104,53 @@ impl ParkingSpot {
let spot = inner . get_mut ( & key ) . expect ( "failed to get spot" ) ;
if timed_out {
if let Some ( timeout ) = timeout {
if Instant ::now ( ) < timeout {
// Did not sleep long enough, try again.
continue ;
}
// If waiting on the cvar timed out then due to how system cvars
// are implemented we may need to continue to sleep longer. If
// the deadline has not been reached then turn the crank again
// and go back to sleep.
if Instant ::now ( ) < timeout . unwrap ( ) {
continue ;
}
// Opportunistically consume `to_unpark` signals even on
// timeout. From the perspective of `unpark` this "agent" raced
// between its own timeout and receiving the unpark signal, but
// from unpark's perspective it's definitely going to wake up N
// agents as returned from the `unpark` return value.
//
// Note that this may actually prevent other threads from
// getting unparked. For example:
//
// * Thread A parks with a timeout
// * Thread B parks with no timeout
// * Thread C decides to unpark 1 thread
// * Thread A's cvar wakes up due to a timeout, blocks on the
// lock
// * Thread C finishes unpark and signals the cvar once
// * Thread B wakes up
// * Thread A and B contend for the lock and A wins
// * A consumes the "to_unpark" value
// * B goes back to sleep since `to_unpark == 0`, thinking that
// a spurious wakeup happened.
//
// It's believed that this is ok, however, since from C's
// perspective one agent was still woken up and is allowed to
// continue, notably A in this case. C doesn't know that A raced
// with B and "stole" its wakeup signal.
if spot . to_unpark > 0 {
spot . to_unpark - = 1 ;
}
} else {
if spot . to_unpark = = 0 {
// If no timeout happen but nothing has unparked this spot (as
// signaled through `to_unpark`) then this is indicative of a
// spurious wakeup. In this situation turn the crank again and
// go back to sleep as this interface doesn't allow for spurious
// wakeups.
continue ;
}
// No timeout happened, and some other thread registered to
// unpark this thread, so consume one unpark notification.
spot . to_unpark - = 1 ;
}
@ -176,55 +212,53 @@ impl ParkingSpot {
#[ cfg(test) ]
mod tests {
use super ::ParkingSpot ;
use once_cell ::sync ::Lazy ;
use std ::ptr ::addr_of ;
use std ::sync ::atomic ::{ AtomicU64 , Ordering } ;
use std ::thread ;
static PARKING_SPOT : Lazy < ParkingSpot > = Lazy ::new ( ParkingSpot ::default ) ;
static ATOMIC : AtomicU64 = AtomicU64 ::new ( 0 ) ;
use std ::time ::{ Duration , Instant } ;
#[ test ]
fn atomic_wait_notify ( ) {
let thread1 = thread ::spawn ( | | {
let atomic_key = addr_of ! ( ATOMIC ) as u64 ;
ATOMIC . store ( 1 , Ordering ::SeqCst ) ;
PARKING_SPOT . unpark ( atomic_key , u32 ::MAX ) ;
PARKING_SPOT . park ( atomic_key , | | ATOMIC . load ( Ordering ::SeqCst ) = = 1 , None ) ;
} ) ;
let parking_spot = & ParkingSpot ::default ( ) ;
let atomic = & AtomicU64 ::new ( 0 ) ;
thread ::scope ( | s | {
let atomic_key = addr_of ! ( atomic ) as u64 ;
let thread1 = s . spawn ( move | | {
atomic . store ( 1 , Ordering ::SeqCst ) ;
parking_spot . unpark ( atomic_key , u32 ::MAX ) ;
parking_spot . park ( atomic_key , | | atomic . load ( Ordering ::SeqCst ) = = 1 , None ) ;
} ) ;
let thread2 = s . spawn ( move | | {
while atomic . load ( Ordering ::SeqCst ) ! = 1 {
parking_spot . park ( atomic_key , | | atomic . load ( Ordering ::SeqCst ) ! = 1 , None ) ;
}
atomic . store ( 2 , Ordering ::SeqCst ) ;
parking_spot . unpark ( atomic_key , u32 ::MAX ) ;
parking_spot . park ( atomic_key , | | atomic . load ( Ordering ::SeqCst ) = = 2 , None ) ;
} ) ;
let thread3 = s . spawn ( move | | {
while atomic . load ( Ordering ::SeqCst ) ! = 2 {
parking_spot . park ( atomic_key , | | atomic . load ( Ordering ::SeqCst ) ! = 2 , None ) ;
}
atomic . store ( 3 , Ordering ::SeqCst ) ;
parking_spot . unpark ( atomic_key , u32 ::MAX ) ;
let thread2 = thread ::spawn ( | | {
let atomic_key = addr_of ! ( ATOMIC ) as u64 ;
while ATOMIC . load ( Ordering ::SeqCst ) ! = 1 {
PARKING_SPOT . park ( atomic_key , | | ATOMIC . load ( Ordering ::SeqCst ) ! = 1 , None ) ;
}
ATOMIC . store ( 2 , Ordering ::SeqCst ) ;
PARKING_SPOT . unpark ( atomic_key , u32 ::MAX ) ;
PARKING_SPOT . park ( atomic_key , | | ATOMIC . load ( Ordering ::SeqCst ) = = 2 , None ) ;
} ) ;
parking_spot . park ( atomic_key , | | atomic . load ( Ordering ::SeqCst ) = = 3 , None ) ;
} ) ;
let thread3 = thread ::spawn ( | | {
let atomic_key = addr_of ! ( ATOMIC ) as u64 ;
while ATOMIC . load ( Ordering ::SeqCst ) ! = 2 {
PARKING_SPOT . park ( atomic_key , | | ATOMIC . load ( Ordering ::SeqCst ) ! = 2 , None ) ;
while atomic . load ( Ordering ::SeqCst ) ! = 3 {
parking_spot . park ( atomic_key , | | atomic . load ( Ordering ::SeqCst ) ! = 3 , None ) ;
}
ATOMIC . store ( 3 , Ordering ::SeqCst ) ;
PARKING_SPOT . unpark ( atomic_key , u32 ::MAX ) ;
atomic . store ( 4 , Ordering ::SeqCst ) ;
parking_spot . unpark ( atomic_key , u32 ::MAX ) ;
PARKING_SPOT . park ( atomic_key , | | ATOMIC . load ( Ordering ::SeqCst ) = = 3 , None ) ;
thread1 . join ( ) . unwrap ( ) ;
thread2 . join ( ) . unwrap ( ) ;
thread3 . join ( ) . unwrap ( ) ;
} ) ;
let atomic_key = addr_of ! ( ATOMIC ) as u64 ;
while ATOMIC . load ( Ordering ::SeqCst ) ! = 3 {
PARKING_SPOT . park ( atomic_key , | | ATOMIC . load ( Ordering ::SeqCst ) ! = 3 , None ) ;
}
ATOMIC . store ( 4 , Ordering ::SeqCst ) ;
PARKING_SPOT . unpark ( atomic_key , u32 ::MAX ) ;
thread1 . join ( ) . unwrap ( ) ;
thread2 . join ( ) . unwrap ( ) ;
thread3 . join ( ) . unwrap ( ) ;
}
mod parking_lot {
@ -302,47 +336,53 @@ mod tests {
num_threads : u32 ,
num_single_unparks : u32 ,
) {
let mut tests = Vec ::with_capacity ( num_latches ) ;
for _ in 0 . . num_latches {
let test = Arc ::new ( SingleLatchTest ::new ( num_threads ) ) ;
let mut threads = Vec ::with_capacity ( num_threads as _ ) ;
for _ in 0 . . num_threads {
let test = test . clone ( ) ;
threads . push ( thread ::spawn ( move | | test . run ( ) ) ) ;
let spot = ParkingSpot ::default ( ) ;
thread ::scope ( | s | {
let mut tests = Vec ::with_capacity ( num_latches ) ;
for _ in 0 . . num_latches {
let test = Arc ::new ( SingleLatchTest ::new ( num_threads , & spot ) ) ;
let mut threads = Vec ::with_capacity ( num_threads as _ ) ;
for _ in 0 . . num_threads {
let test = test . clone ( ) ;
threads . push ( s . spawn ( move | | test . run ( ) ) ) ;
}
tests . push ( ( test , threads ) ) ;
}
tests . push ( ( test , threads ) ) ;
}
for unpark_index in 0 . . num_single_unparks {
thread ::sleep ( delay ) ;
for ( test , _ ) in & tests {
test . unpark_one ( unpark_index ) ;
for unpark_index in 0 . . num_single_unparks {
thread ::sleep ( delay ) ;
for ( test , _ ) in & tests {
test . unpark_one ( unpark_index ) ;
}
}
}
for ( test , threads ) in tests {
test . finish ( num_single_unparks ) ;
for thread in threads {
thread . join ( ) . expect ( "Test thread panic" ) ;
for ( test , threads ) in tests {
test . finish ( num_single_unparks ) ;
for thread in threads {
thread . join ( ) . expect ( "Test thread panic" ) ;
}
}
}
} ) ;
}
struct SingleLatchTest {
struct SingleLatchTest < 'a > {
semaphore : AtomicIsize ,
num_awake : AtomicU32 ,
/// Total number of threads participating in this test.
num_threads : u32 ,
spot : & 'a ParkingSpot ,
}
impl SingleLatchTest {
pub fn new ( num_threads : u32 ) -> Self {
impl < 'a > SingleLatchTest < 'a > {
pub fn new ( num_threads : u32 , spot : & 'a ParkingSpot ) -> Self {
Self {
// This implements a fair (FIFO) semaphore, and it starts out unavailable.
semaphore : AtomicIsize ::new ( 0 ) ,
num_awake : AtomicU32 ::new ( 0 ) ,
num_threads ,
spot ,
}
}
@ -373,14 +413,14 @@ mod tests {
// still be threads that has not yet parked.
while num_threads_left > 0 {
let mut num_waiting_on_address = 0 ;
PARKING_SPOT . with_lot ( self . semaphore_addr ( ) , | thread_data | {
self . spot . with_lot ( self . semaphore_addr ( ) , | thread_data | {
num_waiting_on_address = thread_data . num_parked ;
} ) ;
assert ! ( num_waiting_on_address < = num_threads_left ) ;
let num_awake_before_unpark = self . num_awake . load ( Ordering ::SeqCst ) ;
let num_unparked = PARKING_SPOT . unpark ( self . semaphore_addr ( ) , u32 ::MAX ) ;
let num_unparked = self . spot . unpark ( self . semaphore_addr ( ) , u32 ::MAX ) ;
assert ! ( num_unparked > = num_waiting_on_address ) ;
assert ! ( num_unparked < = num_threads_left ) ;
@ -398,7 +438,7 @@ mod tests {
// Make sure no thread is parked on our semaphore address
let mut num_waiting_on_address = 0 ;
PARKING_SPOT . with_lot ( self . semaphore_addr ( ) , | thread_data | {
self . spot . with_lot ( self . semaphore_addr ( ) , | thread_data | {
num_waiting_on_address = thread_data . num_parked ;
} ) ;
assert_eq ! ( num_waiting_on_address , 0 ) ;
@ -414,7 +454,7 @@ mod tests {
// We need to wait.
let validate = | | true ;
PARKING_SPOT . park ( self . semaphore_addr ( ) , validate , None ) ;
self . spot . park ( self . semaphore_addr ( ) , validate , None ) ;
}
pub fn up ( & self ) {
@ -426,7 +466,7 @@ mod tests {
// the thread we want to pass ownership to has decremented the semaphore counter,
// but not yet parked.
loop {
match PARKING_SPOT . unpark ( self . semaphore_addr ( ) , 1 ) {
match self . spot . unpark ( self . semaphore_addr ( ) , 1 ) {
1 = > break ,
0 = > ( ) ,
i = > panic ! ( "Should not wake up {i} threads" ) ,
@ -440,4 +480,42 @@ mod tests {
}
}
}
#[ test ]
fn wait_with_timeout ( ) {
let parking_spot = & ParkingSpot ::default ( ) ;
let atomic = & AtomicU64 ::new ( 0 ) ;
thread ::scope ( | s | {
let atomic_key = addr_of ! ( atomic ) as u64 ;
const N : u64 = 5 ;
const M : u64 = 1000 ;
let thread = s . spawn ( move | | {
while atomic . load ( Ordering ::SeqCst ) ! = N * M {
let timeout = Instant ::now ( ) + Duration ::from_millis ( 1 ) ;
parking_spot . park (
atomic_key ,
| | atomic . load ( Ordering ::SeqCst ) ! = N * M ,
Some ( timeout ) ,
) ;
}
} ) ;
let mut threads = vec ! [ thread ] ;
for _ in 0 . . N {
threads . push ( s . spawn ( move | | {
for _ in 0 . . M {
atomic . fetch_add ( 1 , Ordering ::SeqCst ) ;
parking_spot . unpark ( atomic_key , 1 ) ;
}
} ) ) ;
}
for thread in threads {
thread . join ( ) . unwrap ( ) ;
}
} ) ;
}
}