Overview
In this lecture, we discuss condition variables, sleep–wakeup races, and lock granularity.
The problem is waking up
std::mutex
gives us mutual exclusionstd::mutex
gives us blocking- Thread blocks until the mutex is unlocked
std::mutex
does not supply a wakeup primitive analogous to interruption- Problem: lost wakeups
Example: Torch relay
runner::handoff_to(runner* partner):
light partner’s torch;
drink;
runner::wait_for_fire():
wait until someone lights my torch;
run;
Polling torch relay
struct runner {
std::atomic<bool> torch_lit = false;
void handoff_to(runner* partner) {
partner->torch_lit = true;
}
void wait_for_fire() {
while (!this->torch_lit) {
}
}
};
Blocking torch relay, try 1
struct runner {
std::mutex mutex;
bool torch_lit = false;
void handoff_to(runner* partner) {
partner->mutex.lock();
partner->torch_lit = true;
partner->mutex.unlock();
}
void wait_for_fire() {
this->mutex.lock();
while (!this->torch_lit) {
}
this->mutex.unlock();
}
};
Deadlock!
Conditions for deadlock
- Mutual exclusion
- Each resource is held by at most one thread
- Hold and wait
- A thread holds one resource while attempting to acquire another
- Blocking wait (no preemption)
- A thread attempting to acquire a resource will do so forever
- Circular wait
Hypothetical solution: Wakeup point
wakeup_point.wait()
: Block until someone calls…wakeup_point.notify_all()
- Wake up any thread waiting in
wait()
- Do nothing if no thread is waiting
- Wake up any thread waiting in
Question
- How would you implement
wakeup_point
?
Blocking torch relay, try 2
struct runner {
std::mutex mutex;
bool torch_lit = false;
wakeup_point wakeup;
void handoff_to(runner* partner) {
partner->mutex.lock();
partner->torch_lit = true;
partner->wakeup.notify_all();
partner->mutex.unlock();
}
void wait_for_fire() {
this->mutex.lock();
while (!this->torch_lit) {
this->wakeup.wait();
}
this->mutex.unlock();
}
};
Blocking torch relay, try 3
struct runner {
std::mutex mutex;
bool torch_lit = false;
wakeup_point wakeup;
void handoff_to(runner* partner) {
partner->mutex.lock();
partner->torch_lit = true;
partner->wakeup.notify_all();
partner->mutex.unlock();
}
void wait_for_fire() {
this->mutex.lock();
while (!this->torch_lit) {
this->mutex.unlock();
this->wakeup.wait();
this->mutex.lock();
}
this->mutex.unlock();
}
};
- Lost wakeup problem
Condition variables
- A version of
wakeup_point
that avoids lost wakeups std::condition_variable_any
has two important methodswait(mutex)
: Block until another thread calls…notify_all()
- Atomic unlock and block
mutex
must be locked whenwait
is called- Thread atomically unlocks and blocks
- Any subsequent
mutex.lock()
will always observe the waiting thread as blocked - So
mutex.lock(); cv.notify_all()
sequence will always wake up the waiting thread!
Correct blocking torch relay
struct runner {
std::mutex mutex;
bool torch_lit = false;
std::condition_variable_any wakeup;
void handoff_to(runner* partner) {
partner->mutex.lock();
partner->torch_lit = true;
partner->wakeup.notify_all();
partner->mutex.unlock();
}
void wait_for_fire() {
this->mutex.lock();
while (!this->torch_lit) {
this->wakeup.wait(this->mutex);
}
this->mutex.unlock();
}
};
Lost and spurious wakeups
- Lost wakeup
- Thread blocks forever though it should not
- Serious problem, solved by condition variables
- Spurious wakeup
- Thread wakes up though it should not
- Unavoidable problem, solved by loops
Condition variables and predicates
- A
std::mutex
protects state from data races - A
std::condition_variable
represents a condition or predicate- Example: “Is the torch lit?”
- The lock passed to
wait()
should protect the state necessary to compute the predicate - Any code that might modify the predicate must call
notify_all()
Minipipe: The pipe for communication between threads
write
: Write to pipe- Block until there’s room to write or read end closed
read
: Read from pipe- Block until there’s data to read or write end closed
close_write
,close_read
minipipe
struct minipipe {
char pbuf;
bool full = false;
bool read_closed = false;
bool write_closed = false;
ssize_t write(const char* buf, size_t sz);
void close_write();
ssize_t read(char* buf, size_t sz);
void close_read();
};
minipipe::write
// minipipe::write(buf, sz)
// Write up to `sz` bytes from `buf` into the minipipe.
// Return value `ret` is:
// * `0 < ret <= sz`: Successfully wrote `ret` bytes.
// * `ret == -1 && errno == EPIPE`: Read end closed.
// * `ret == -1 && errno == EAGAIN`: Minipipe full.
// * `ret == 0`: `sz == 0`.
ssize_t minipipe::write(const char* buf, size_t sz) {
assert(!this->write_closed);
if (sz == 0) {
return 0;
} else if (this->read_closed) {
errno = EPIPE;
return -1;
} else if (this->full) {
errno = EAGAIN;
return -1;
} else {
this->pbuf = buf[0];
this->full = true;
return 1;
}
}
Write example
void writer_thread(minipipe& minib, const char* msg) {
size_t pos = 0, len = strlen(msg);
while (pos != len) {
ssize_t nw = minib.write(msg + pos, len - pos);
++nwrites;
if (nw > 0) {
pos += nw;
} else if (nw == -1 && errno != EAGAIN) {
fprintf(stderr, "writer_thread: %s\n", strerror(errno));
break;
}
}
minib.close_write();
}
minipipe-poll
Question
- How to eliminate data races?
Question
- How to eliminate data races and block?
Lock granularity
std::shared_mutex
- Readers–writer lock
lock_shared
vs.lock