This is not the current version of the class.

Synchronization 2: Condition variables and lost wakeups

Overview

In this lecture, we discuss condition variables, sleep–wakeup races, and lock granularity.

The problem is waking up

Example: Torch relay

Successful but boring relay handoff

relay runner:
   wait until someone lights my torch;
   run to partner;
   light partner’s torch;
   douse my torch;
   do it again

Polling torch relay

struct runner {
    std::atomic<bool> torch_lit = false;
};

void runner_thread(runner* self, runner* partner) {
    while (!done) {
        while (!self->torch_lit) {
        }
        while (partner->torch_lit) {
        }
        ++nhandoffs;
        partner->torch_lit = true;
        self->torch_lit = false;
    }
}

Blocking torch relay, try 1

struct runner {
    bool torch_lit = false;
    std::mutex m;
}

void runner_thread(runner* self, runner* partner) {
    while (!done) {
        self->m.lock();
        while (!self->torch_lit) {
        }
        self->m.unlock();
        partner->m.lock();
        while (partner->torch_lit) {
        }
        ++nhandoffs;
        partner->torch_lit = true;
        partner->m.unlock();
        self->m.lock();
        self->torch_lit = false;
        self->m.unlock();
    }
}

Deadlock!

Gridlock

Deadlock

Conditions for deadlock

Pulse the mutex to avoid deadlock

struct runner {
    bool torch_lit = false;
    std::mutex m;
}

void runner_thread(runner* self, runner* partner) {
    while (!done) {
        self->m.lock();
        while (!self->torch_lit) {
            self->m.unlock();
            self->m.lock();
        }
        self->m.unlock();
        ...

Blocking beyond mutual exclusion

Condition variables

Condition variable runner

struct runner {
    bool torch_lit = false;
    std::mutex m;
    std::condition_variable_any cv;
}

void runner_thread(runner* self, runner* partner) {
    while (!done) {
        self->m.lock();
        while (!self->torch_lit) {
            self->cv.wait(self->m);
        }
        self->m.unlock();
        ...
        partner->cv.notify_all();

Programming with condition variables

The importance of atomicity

A bad condition variable interface

void runner_thread(runner* self, runner* partner) {
    while (!done) {
        self->m.lock();
        while (!self->torch_lit) {
            self->m.unlock();
            self->cv.wait();
            self->m.lock();
        }
        self->m.unlock();

        partner->m.lock();
        ...
        partner->torch_lit = true;
        partner->notify_all();
        partner->m.unlock();
    }
}

Sleep–wakeup race

Lost and spurious wakeups

Condition variables and predicates

Why aren’t mutexes enough?

self->unlit_mutex.lock();
assert(self->torch_lit);

self->m.lock();
partner->m.lock();
assert(!partner->torch_lit);
partner->torch_lit = true;
partner->unlit_mutex.unlock();
self->torch_lit = false;
self->m.unlock();
self->unlit_mutex.lock();

Cache alignment and lock granularity

Minipipe: The pipe for communication between threads

minipipe

struct minipipe {
    char pbuf;
    bool full = false;
    bool read_closed = false;
    bool write_closed = false;

    ssize_t write(const char* buf, size_t sz);
    void close_write();

    ssize_t read(char* buf, size_t sz);
    void close_read();
};

minipipe::write

// minipipe::write(buf, sz)
//    Write up to `sz` bytes from `buf` into the minipipe.
//    Return value `ret` is:
//    * `0 < ret <= sz`: Successfully wrote `ret` bytes.
//    * `ret == -1 && errno == EPIPE`: Read end closed.
//    * `ret == -1 && errno == EAGAIN`: Minipipe full.
//    * `ret == 0`: `sz == 0`.

ssize_t minipipe::write(const char* buf, size_t sz) {
    assert(!this->write_closed);
    if (sz == 0) {
        return 0;
    } else if (this->read_closed) {
        errno = EPIPE;
        return -1;
    } else if (this->full) {
        errno = EAGAIN;
        return -1;
    } else {
        this->pbuf = buf[0];
        this->full = true;
        return 1;
    }
}

Write example

void writer_thread(minipipe& minib, const char* msg) {
    size_t pos = 0, len = strlen(msg);
    while (pos != len) {
        ssize_t nw = minib.write(msg + pos, len - pos);
        ++nwrites;

        if (nw > 0) {
            pos += nw;
        } else if (nw == -1 && errno != EAGAIN) {
            fprintf(stderr, "writer_thread: %s\n", strerror(errno));
            break;
        }
    }
    minib.close_write();
}

minipipe-poll

Question

Question

Lock granularity

std::shared_mutex

std::unique_lock vs. std::scoped_lock