Overview
In this lecture, we discuss condition variables, sleep–wakeup races, and lock granularity.
The problem is waking up
std::mutex
helps provide mutual exclusionstd::mutex
blocks- Thread blocks in
lock()
until it obtains the lock
- Thread blocks in
std::mutex
cannot model all forms of blocking- How to wait until a condition is true?
Example: Torch relay
relay runner:
wait until someone lights my torch;
run to partner;
light partner’s torch;
douse my torch;
do it again
Polling torch relay
struct runner {
std::atomic<bool> torch_lit = false;
};
void runner_thread(runner* self, runner* partner) {
while (!done) {
while (!self->torch_lit) {
}
while (partner->torch_lit) {
}
++nhandoffs;
partner->torch_lit = true;
self->torch_lit = false;
}
}
- Why is the second loop necessary?
Blocking torch relay, try 1
struct runner {
bool torch_lit = false;
std::mutex m;
}
void runner_thread(runner* self, runner* partner) {
while (!done) {
self->m.lock();
while (!self->torch_lit) {
}
self->m.unlock();
partner->m.lock();
while (partner->torch_lit) {
}
++nhandoffs;
partner->torch_lit = true;
partner->m.unlock();
self->m.lock();
self->torch_lit = false;
self->m.unlock();
}
}
Deadlock!
Conditions for deadlock
- Mutual exclusion
- Each resource is held by at most one thread
- Hold and wait
- A thread holds one resource while attempting to acquire another
- Blocking wait (no preemption)
- A thread attempting to acquire a resource will do so forever
- Circular wait
Pulse the mutex to avoid deadlock
- Unlocking & then re-locking a mutex gives other threads a chance to run
struct runner {
bool torch_lit = false;
std::mutex m;
}
void runner_thread(runner* self, runner* partner) {
while (!done) {
self->m.lock();
while (!self->torch_lit) {
self->m.unlock();
self->m.lock();
}
self->m.unlock();
...
Blocking beyond mutual exclusion
- The
runner_thread
should wait until a condition changes- Wait for my torch to be lit
- Wait for partner’s torch to not be lit
- A mutex is the wrong abstraction for this kind of blocking
Condition variables
- The condition variable synchronization object can block until a condition changes
- C++
std::condition_variable_any
cv.wait(std::mutex& m)
(may be other kind of lock)m
must be locked by this thread- In one atomic step, release the mutex (
m.unlock()
) and block untilcv.notify_all()
is called - After it wakes up,
cv.wait
re-acquires the mutex (m.lock()
) before returning
cv.notify_all()
- Wake up all threads blocking in
cv.wait()
- Wake up all threads blocking in
Condition variable runner
struct runner {
bool torch_lit = false;
std::mutex m;
std::condition_variable_any cv;
}
void runner_thread(runner* self, runner* partner) {
while (!done) {
self->m.lock();
while (!self->torch_lit) {
self->cv.wait(self->m);
}
self->m.unlock();
...
partner->cv.notify_all();
Programming with condition variables
- Each condition variable is associated in the programmer’s mind with a condition (some Boolean expression)
- This condition should be protected by the mutex argument to
cv.wait()
- Meaning the condition will not change while
m
is locked
- Meaning the condition will not change while
- Usage pattern
m.lock(); while (!condition_holds()) { cv.wait(m); } ...
- Any time the condition changes,
cv.notify_all()
must be called (cv.notify_one()
in special situations)
The importance of atomicity
- The
std::mutex
argument tocv.wait()
and thewait
operation’s atomicity are extremely important - Imagine if it wasn’t atomic
A bad condition variable interface
void runner_thread(runner* self, runner* partner) {
while (!done) {
self->m.lock();
while (!self->torch_lit) {
self->m.unlock();
self->cv.wait();
self->m.lock();
}
self->m.unlock();
partner->m.lock();
...
partner->torch_lit = true;
partner->notify_all();
partner->m.unlock();
}
}
Sleep–wakeup race
- This race condition is so important that it has a name: sleep–wakeup race
- A thread that does not wake up when it should has experienced a lost wakeup
Lost and spurious wakeups
- Lost wakeup
- Thread blocks forever though it should not
- Serious problem, solved by condition variables
- Spurious wakeup
- Thread wakes up though it should not
- Unavoidable, solved by loops
- Minimize them for efficiency
Condition variables and predicates
- A
std::mutex
protects state from data races - A
std::condition_variable
waits for a condition or predicate- Example: “Is the torch lit?”
- The lock passed to
wait()
should protect the state necessary to compute the predicate - Any code that might modify the predicate must call
notify_all()
Why aren’t mutexes enough?
mutex::lock()
blocks; can we use mutexes in a funky way to implement some condition variables?- Example:
runner::unlit_mutex
, locked when this thread’s torch is not lit
self->unlit_mutex.lock();
assert(self->torch_lit);
self->m.lock();
partner->m.lock();
assert(!partner->torch_lit);
partner->torch_lit = true;
partner->unlit_mutex.unlock();
self->torch_lit = false;
self->m.unlock();
self->unlit_mutex.lock();
Cache alignment and lock granularity
Minipipe: The pipe for communication between threads
write
: Write to pipe- Block until there’s room to write or read end closed
read
: Read from pipe- Block until there’s data to read or write end closed
close_write
,close_read
minipipe
struct minipipe {
char pbuf;
bool full = false;
bool read_closed = false;
bool write_closed = false;
ssize_t write(const char* buf, size_t sz);
void close_write();
ssize_t read(char* buf, size_t sz);
void close_read();
};
minipipe::write
// minipipe::write(buf, sz)
// Write up to `sz` bytes from `buf` into the minipipe.
// Return value `ret` is:
// * `0 < ret <= sz`: Successfully wrote `ret` bytes.
// * `ret == -1 && errno == EPIPE`: Read end closed.
// * `ret == -1 && errno == EAGAIN`: Minipipe full.
// * `ret == 0`: `sz == 0`.
ssize_t minipipe::write(const char* buf, size_t sz) {
assert(!this->write_closed);
if (sz == 0) {
return 0;
} else if (this->read_closed) {
errno = EPIPE;
return -1;
} else if (this->full) {
errno = EAGAIN;
return -1;
} else {
this->pbuf = buf[0];
this->full = true;
return 1;
}
}
Write example
void writer_thread(minipipe& minib, const char* msg) {
size_t pos = 0, len = strlen(msg);
while (pos != len) {
ssize_t nw = minib.write(msg + pos, len - pos);
++nwrites;
if (nw > 0) {
pos += nw;
} else if (nw == -1 && errno != EAGAIN) {
fprintf(stderr, "writer_thread: %s\n", strerror(errno));
break;
}
}
minib.close_write();
}
minipipe-poll
Question
- How to eliminate data races?
Question
- How to eliminate data races and block?
Lock granularity
std::shared_mutex
- Readers–writer lock
lock_shared
vs.lock