This is not the current version of the class.
Toggle slide view

Synchronization 2: Condition variables and lost wakeups

Overview

In this lecture, we discuss condition variables, sleep–wakeup races, and lock granularity.

The problem is waking up

Example: Torch relay

Successful but boring relay handoff

runner::handoff_to(runner* partner):
   light partner’s torch;
   drink;

runner::wait_for_fire():
   wait until someone lights my torch;
   run;

Polling torch relay

struct runner {
    std::atomic<bool> torch_lit = false;

    void handoff_to(runner* partner) {
        partner->torch_lit = true;
    }

    void wait_for_fire() {
        while (!this->torch_lit) {
        }
    }
};

Blocking torch relay, try 1

struct runner {
    std::mutex mutex;
    bool torch_lit = false;

    void handoff_to(runner* partner) {
        partner->mutex.lock();
        partner->torch_lit = true;
        partner->mutex.unlock();
    }

    void wait_for_fire() {
        this->mutex.lock();
        while (!this->torch_lit) {
        }
        this->mutex.unlock();
    }
};

Deadlock!

Gridlock

Deadlock

Conditions for deadlock

Hypothetical solution: Wakeup point

Question

Blocking torch relay, try 2

struct runner {
    std::mutex mutex;
    bool torch_lit = false;
    wakeup_point wakeup;

    void handoff_to(runner* partner) {
        partner->mutex.lock();
        partner->torch_lit = true;
        partner->wakeup.notify_all();
        partner->mutex.unlock();
    }

    void wait_for_fire() {
        this->mutex.lock();
        while (!this->torch_lit) {
            this->wakeup.wait();
        }
        this->mutex.unlock();
    }
};

Blocking torch relay, try 3

struct runner {
    std::mutex mutex;
    bool torch_lit = false;
    wakeup_point wakeup;

    void handoff_to(runner* partner) {
        partner->mutex.lock();
        partner->torch_lit = true;
        partner->wakeup.notify_all();
        partner->mutex.unlock();
    }

    void wait_for_fire() {
        this->mutex.lock();
        while (!this->torch_lit) {
            this->mutex.unlock();
            this->wakeup.wait();
            this->mutex.lock();
        }
        this->mutex.unlock();
    }
};

Condition variables

Correct blocking torch relay

struct runner {
    std::mutex mutex;
    bool torch_lit = false;
    std::condition_variable_any wakeup;

    void handoff_to(runner* partner) {
        partner->mutex.lock();
        partner->torch_lit = true;
        partner->wakeup.notify_all();
        partner->mutex.unlock();
    }

    void wait_for_fire() {
        this->mutex.lock();
        while (!this->torch_lit) {
            this->wakeup.wait(this->mutex);
        }
        this->mutex.unlock();
    }
};

Lost and spurious wakeups

Condition variables and predicates

Minipipe: The pipe for communication between threads

minipipe

struct minipipe {
    char pbuf;
    bool full = false;
    bool read_closed = false;
    bool write_closed = false;

    ssize_t write(const char* buf, size_t sz);
    void close_write();

    ssize_t read(char* buf, size_t sz);
    void close_read();
};

minipipe::write

// minipipe::write(buf, sz)
//    Write up to `sz` bytes from `buf` into the minipipe.
//    Return value `ret` is:
//    * `0 < ret <= sz`: Successfully wrote `ret` bytes.
//    * `ret == -1 && errno == EPIPE`: Read end closed.
//    * `ret == -1 && errno == EAGAIN`: Minipipe full.
//    * `ret == 0`: `sz == 0`.

ssize_t minipipe::write(const char* buf, size_t sz) {
    assert(!this->write_closed);
    if (sz == 0) {
        return 0;
    } else if (this->read_closed) {
        errno = EPIPE;
        return -1;
    } else if (this->full) {
        errno = EAGAIN;
        return -1;
    } else {
        this->pbuf = buf[0];
        this->full = true;
        return 1;
    }
}

Write example

void writer_thread(minipipe& minib, const char* msg) {
    size_t pos = 0, len = strlen(msg);
    while (pos != len) {
        ssize_t nw = minib.write(msg + pos, len - pos);
        ++nwrites;

        if (nw > 0) {
            pos += nw;
        } else if (nw == -1 && errno != EAGAIN) {
            fprintf(stderr, "writer_thread: %s\n", strerror(errno));
            break;
        }
    }
    minib.close_write();
}

minipipe-poll

Question

Question

Lock granularity

std::shared_mutex

std::unique_lock vs. std::scoped_lock