This is not the current version of the class.

Synchronization 3: Mutexes, condition variables, and compare-exchange

We continue from where the previous lecture left off -- implementing a synchronized bounded buffer.

Recall that a bounded buffer is a circular buffer that can hold up to CAP characters, where CAP is the capacity of the buffer. The bounded buffer supports following opeartions:

Example: Assuming we have a bounded buffer object bbuf with CAP=4.

bbuf.write("ABCDE", 5); // returns 4
bbuf.read(buf, 3);      // returns 3 ("ABC")
bbuf.read(buf, 3);      // returns 1 ("D")
bbuf.read(buf, 3);      // blocks

The bounded buffer preserves two important properties:

Each bounded buffer operation should also be atomic, just like read() and write() system calls.

Review the bounded buffer implementation in synch2/bbuffer-basic.cc to see how the circular buffer operates. This version of the bounded buffer does not yet perform synchronization so it only works with a single thread. Instead of blocking the buffer returns -1 whenever blocking should occur. We need to fix it in the synchronized version.

To help us a design a synchronized bounded buffer, we need help from something we call the Fundamental Law of Synchronization.

Fundamental Law of Synchronization: If two threads simultaneously access an object in memory, then both accesses must be reads. Otherwise, the program invokes undefined behavior.

C++ std::atomic-wrapped variables do not fall under this rule. They can be read and written concurrently in different threads and behavior is well-defined by the C++ memory consistency model.

Now let's look at the definition of the bounded buffer:

struct bbuffer {
    static constexpr size_t bcapacity = 128;
    char bbuf_[bcapacity];
    size_t bpos_ = 0;
    size_t blen_ = 0;
    bool write_closed_ = false;

    ...
};

The bounded buffer's internal state, bbuf_, bpos_, blen_, and write_closed_ are both modified and read by read() and write() methods. Local variables defined within these methods are not shared. So we need to carry out synchronization on shared variables (internal state of the buffer), but not on local variables.

Mutex synchronization

We can make the bounded buffer synchronized by using std::mutex objects. We can associate a mutex with the internal state of the buffer, and only access these internal state when the mutex is locked.

The bounded_buffer::write() method in synch2/bbuffer-mutex.cc is implemented with mutex synchronization. Note that we added a definition of a mutex to the bbuffer struct definition, and we are only accessing the internal state of the buffer within the region between this->mutex_.lock() and this->mutex_.unlock(), which the the time period when the thread locks the mutex.

The association between the mutex and the state it protects is rather arbitrary. These mutexes are also called "advisory locks", as their association with the state they protect are not enforced in any way by the runtime system, and must be taken care of by the programmer. Their effectiveness solely relies on the program following protocols associating the mutex with the protected state. In other words, if a mutex is not used correctly, there is no guarantee that the underlying state is being properly protected.

Mutex pitfalls

Now consider the following code, which move the mutex lock()/unlock() pair to inside the while loop. We still have just one lock() and one unlock() in our code. Is it correct?

ssize_t bbuffer::write(const char* buf, size_t sz) {
    assert(!this->write_closed_);
    size_t pos = 0;
    while (pos < sz && this->blen_ < bcapacity) {
        this->mutex_.lock();
        size_t bindex = (this->bpos_ + this->blen_) % bcapacity;
        this->bbuf_[bindex] = buf[pos];
        ++this->blen_;
        ++pos;
        this->mutex_.unlock();
    }
    ...
}

The code is incorrect because this->blen_ is not protected by the mutex, but it should be.

What about the following code -- is it correct?

ssize_t bbuffer::write(const char* buf, size_t sz) {
    this->mutex_.lock();
    assert(!this->write_closed_);
    size_t pos = 0;
    while (pos < sz && this->blen_ < bcapacity) {
        this->mutex_.lock();
        size_t bindex = (this->bpos_ + this->blen_) % bcapacity;
        this->bbuf_[bindex] = buf[pos];
        ++this->blen_;
        ++pos;
        this->mutex_.unlock();
    }
    ...
}

It's also wrong! Upon entering the while loop for the first time, the mutex is already locked, and we are trying to lock it again. Trying to lock a mutex multiple times in the same thread causes the second lock attempt to block indefinitely.

So what if we do this:

ssize_t bbuffer::write(const char* buf, size_t sz) {
    this->mutex_.lock();
    assert(!this->write_closed_);
    size_t pos = 0;
    while (pos < sz && this->blen_ < bcapacity) {
        this->mutex_.unlock();
        this->mutex_.lock();
        size_t bindex = (this->bpos_ + this->blen_) % bcapacity;
        this->bbuf_[bindex] = buf[pos];
        ++this->blen_;
        ++pos;
        this->mutex_.unlock();
        this->mutex_.lock();
    }
    ...
}

Now everything is protected, right? NO! This is also incorrect and in many ways much worse than the two previous cases.

Although this->blen_ is now seemingly protected by the mutex, it is being protected within a different region from the region where the rest of the buffer state (bbuf_, bpos_) is protected. Further more, the mutex is unlocked at the end of every iteration of the while loop. This means that when two threads call the write() method concurrently, the lock can bounce between the two threads and the characters written by the threads can be interleaved, violating the atomicity requirement of the write() method.

C++ mutex patterns

We may find it very common that we write some synchronized function where we need to lock the mutex first, and then unlock it before the function returns. Doing it repeatedly can be tedious. Also, if the function can return at multiple points, it is possible to forget a unlock statement before a return, resulting errors. C++ has a pattern to help us deal with these problems and simplify programming: a scoped lock.

We use scoped locks to simplify programming of the bounded buffer in synch2/bbuffer-scoped.cc. The write() method now looks like the following:

ssize_t bbuffer::write(const char* buf, size_t sz) {
    std::unique_lock<std::mutex> guard(this->mutex_);
    assert(!this->write_closed_);
    size_t pos = 0;
    while (pos < sz && this->blen_ < bcapacity) {
        size_t bindex = (this->bpos_ + this->blen_) % bcapacity;
        this->bbuf_[bindex] = buf[pos];
        ++this->blen_;
        ++pos;
    }
    ...
}

Note that in the first line of the function body we declared a std::unique_lock object, which is a scoped lock that locks the mutex for the scope of the function. Upon initialization of the std::unique_lock object, the mutex is automatically locked, and when this object goes out of scope, the mutex is automatically unlocked. These special scoped lock objects lock and unlock mutexes in their constructors and destructors to achieve this effect. This design pattern is also called Resource Acquisition is Initialization (or RAII), and is a common pattern in software engineering in general. The use of RAII simplify coding and also avoids certain programming errors.

You should try to leverage these scoped lock objects for your synchronization tasks in the problem set as well.

Condition variables

So far we implemented everything in the spec of the bounded buffer except blocking. It turns out mutex alone is not enough to implement this feature -- we need another synchronization object. In standard C++ this object is called a condition variable.

A condition variable supports the following operations:

Condition variables are designed to avoid the sleep-wakeup race conditions we briefly visited when we discussed signal handlers. The atomicity of the unlocking and the blocking guarantees that a thread that blocks can't miss a notify_all() message.

Logically, the writer to the bounded buffer should block when the buffer becomes full, and should unblock when the buffer becomes nonfull again. Let's create a condition variable, called nonfull_, in the bounded buffer, just under the mutex. Note that we conveniently named the condition variable after the condition under which the function should unblock. It will make code easier to read later on. The write() method implements blocking is in synch2/bbuffer-cond.cc. It looks like the following:

ssize_t bbuffer::write(const char* buf, size_t sz) {
    std::unique_lock<std::mutex> guard(this->mutex_);
    assert(!this->write_closed_);
    while (this->blen_ == bcapacity) {  // #1
        this->nonfull_.wait(guard);
    }
    size_t pos = 0;
    while (pos < sz && this->blen_ < bcapacity) {
        size_t bindex = (this->bpos_ + this->blen_) % bcapacity;
        this->bbuf_[bindex] = buf[pos];
        ++this->blen_;
        ++pos;
    }
    ...

The new code at #1 implements blocking until the condition is met. This is a pattern when using condition variables: the condition variable's wait() function is almost always called in a while loop, and the loop tests the condition in which the function must block.

On the other hand, notify_all() should be called whenever some changes we made might turn the unblocking condition true. In our scenario, this means we must call notify_all() in the read() method, which takes characters out of the buffer and can potentially unblock the writer, as shown in the inserted code #2 below:

ssize_t bbuffer::read(char* buf, size_t sz) {
    std::unique_lock<std::mutex> guard(this->mutex_);
    ...
    while (pos < sz && this->blen_ > 0) {
        buf[pos] = this->bbuf_[this->bpos_];
        this->bpos_ = (this->bpos_ + 1) % bcapacity;
        --this->blen_;
        ++pos;
    }
    if (pos > 0) {                   // #2
        this->nonfull_.notify_all();
    }

Note that we only put notify_all() in a if but put the wait() inside a while loop. Why is it necessary to have wait() in a while loop?

wait() is almost always used in a loop because of what we call spurious wakeups. Since notify_all() wakes up all threads blocking on a certain wait() call, by the time when a particular blocking thread locks the mutex and gets to run, it's possible that some other blocking thread has already unblocked, made some progress, and changed the unblocking condition back to false. For this reason, a "woken-up" must revalidate the unblocking condition before proceeding further, and if the unblocking condition is not met it must go back to blocking. The while loop achieves exactly this.

Implementing a mutex with a single bit

Last time we showed that we can implement a busy-waiting mutex using an atomic counter. Now we show that a mutex can also be implemented using just a single bit, with some special atomic machine instructions.

A busy-waiting mutex (also called a spin lock) can be implemented as follows:

struct mutex {
    std::atomic<int> spinlock;

    void lock() {
        while (spinlock.swap(1) == 1) {}
    }

    void unlock() {
        spinlock.store(0);
    }
};

The spinlock.swap() method is an atomic swap method, which in one atomic step stores the specified value to the atomic spinlock variable and returns the old value of the variable.

It works because lock() will not return unless spinlock previously contains value 0 (which means unlocked). In that case it will atomically stores value 1 (which means locked) to spinlock and prevents other lock() calls from returning, hence ensuring mutual exclusion. While it spin-waits, it simply swaps spinlock's old value 1 with 1, effectly leaving the lock untouched. Please take a moment to appreciate how this simple construct correctly implements mutual exclusion.

x86-64 provides this atomic swap functionality via the lock xchg assembly instruction. We have shown that it is all we need to implement a mutex with just one bit. x86-64 provides a more powerful atomic instruction that further opens the possibility of what can be done atomically in modern processors. The instruction is called a compare-exchange, or lock cmpxchg. It is powerful enough to implement atomic swap, add, subtract, multiplication, square root, and many other things you can think of.

The behavior of the instruction is defined as follows:

// Everything in one atomic step
int compare_exchange(int* object, int expected, int desired) {
    if (*object == expected) {
        *object = desired;
        return expected;
    } else {
        return *object;
    }
}

This instruction is also accessible as the this->compare_exchange_strong() member method for C++ std::atomic type objects. Instead of returning an old value, it returns a boolean indicating whether the exchange was successful.

For example, we can use the compare-exchange instruction to atomically add 7 to any integer:

void add7(int* x) {
    int expected = *x;
    while (compare_exchange(x, expected, expected + 7)
            != expected) {
        expected = *x;
    }
}