This is not the current version of the class.

Synchronization 2: Mutex, bounded buffers

Recall that in the last lecture we showed that updating shared memory variables in concurrent threads requires synchronization, otherwise some update may get lost because of the interleaving of operations in different threads. Synchronized updates can be achieved by using the C++ std::atomic template library, which automatically translates the addl instruction used to perform the update into a lock-prefixed instruction that behaves atomically.

C++'s std::atomic library is powerful (and also great progress in standardization of atomic operations), but it works only on integers that are word-sized or below. To synchronize more complex objects in memory or perform more complex synchronization tasks, we need abstractions called synchronization objects.

Synchronization objects are types whose methods can be used to achieve synchronization and atomicity on normal (non-std::atomic-wrapped) objects. Synchronization objects provides various abstract properties that simplify programming multi-threaded access to shared data. The most common synchronization object is called a mutex, which provides the mutual exclusion property.

Mutual exclusion

Mutual exclusion means that at most one thread accesses the shared data at a time.

In our multi-threaded incr-basic.cc example from the last lecture, the code does not work because more than one thread can access the shared variable at a time. The code would behave correctly if the mutual exclusion policy is enforced. We can use a mutex object to enforce mutual exclusion (incr-mutex.cc). In this example it has the same effect as wrapping *x in a std::atomic template:

std::mutex mutex;

void threadfunc(unsigned* x) {
    for (int i = 0; i != 10000000; ++i) {
        mutex.lock();
        *x += 1;
        mutex.unlock();
    }
}

The mutex has an internal state (denoted by state), which can be either locked or unlocked. The semantics of a mutex object is as follows:

The mutual exclusion policy is enforced in the code region between the lock() and unlock() invocations. We call this region the critical region.

Implementing a mutex

Let's now think about how we may implement such a mutex object.

Does the following work?

struct mutex {
    static constexpr int unlocked = 0;
    static constexpr int locked = 1;

    int state = unlocked;

    void lock() {
        while (state == locked) {}
        state = locked;
    }

    void unlock() {
        state = unlocked;
    }
};

No! Imagine that two threads calls lock() on an unlocked mutex at the same time. They both observe that state == unlocked, skip the while loop, and set state = locked, and then return. Now both threads think they have the lock and proceeds to the critical region, all at the same time! Not Good!

In order to properly implement a mutex we need atomic read-modify-write operation. std::atomic provides these operations in the form of operator++and operator-- operators. The following mutex implementation should be correct:

struct mutex {
    std::atomic<int> state = 0;

    void lock() {
        while (++state != 1) {
            --state;
        }
    }

    unlock() {
        --state;
    }
};

Note that the unlock() method performs an atomic decrement operation, instead of simply writing 0. Simply storing 0 to the mutex state is incorrect because of if this store occurred between the ++state and --state steps in the while loop in the lock() method, state becomes negative and the while loop can never exit.

With the help of mutex, we can build objects that support synchronized accesses with more complex semantics.

Bounded buffers

A bounded buffer is a synchronized object that supports the following operations:

Bounded buffer is the abstraction used to implement pipes and non-seekable stdio caches.

Unsynchronized buffer

Let's first look at a bounded buffer implementation bbuffer-basic.cc, which does not perform any synchronization.

struct bbuffer {
    static constexpr size_t bcapacity = 128;
    char bbuf_[bcapacity];
    size_t bpos_ = 0;
    size_t blen_ = 0;
    bool write_closed_ = false;

    ssize_t read(char* buf, size_t sz);
    ssize_t write(const char* buf, size_t sz);
    void shutdown_write();
};

ssize_t bbuffer::write(const char* buf, size_t sz) {
    assert(!this->write_closed_);
    size_t pos = 0;
    while (pos < sz && this->blen_ < bcapacity) {
        size_t bindex = (this->bpos_ + this->blen_) % bcapacity;
        size_t bspace = std::min(bcapacity - bindex, bcapacity - this->blen_);
        size_t n = std::min(sz - pos, bspace);
        memcpy(&this->bbuf_[bindex], &buf[pos], n);
        this->blen_ += n;
        pos += n;
    }
    if (pos == 0 && sz > 0) {
        return -1;  // try again
    } else {
        return pos;
    }
}

ssize_t bbuffer::read(char* buf, size_t sz) {
    size_t pos = 0;
    while (pos < sz && this->blen_ > 0) {
        size_t bspace = std::min(this->blen_, bcapacity - this->bpos_);
        size_t n = std::min(sz - pos, bspace);
        memcpy(&buf[pos], &this->bbuf_[this->bpos_], n);
        this->bpos_ = (this->bpos_ + n) % bcapacity;
        this->blen_ -= n;
        pos += n;
    }
    if (pos == 0 && sz > 0 && !this->write_closed_) {
        return -1;  // try again
    } else {
        return pos;
    }
}

It implements what we call circular buffer. Every time we read a character out of the buffer, we increment bpos_, which is the index of the next character to be read in the buffer. Whenever we write to the buffer, we increment blen_, which is the number of bytes currently occupied in the buffer. The buffer is circular, which is why all index arithmetic is modulo the total capacity of the buffer.

When there is just one thread accessing the buffer, it works perfectly fine. But does it work when multiple threads are using the buffer at the same time? In our test program in bbuffer-basic.cc, we have one reader thread reading from the buffer and a second writer thread writing to the buffer. We would expect everything written to the buffer by the writer thread to show up exactly as it was written once read out by the reader thread.

It does not work, because there is no synchronization over the internal state of the bounded buffer. bbuffer::read() and bbuffer::write() both modify internal state of the bbuffer object (most critically bpos_ and blen_), and such accesses require synchronization to work correctly in a multi- threaded environment. One way to fix this is to wrap the function bodies of the read() and write() methods in critical regions, using a mutex.

A correct version of a synchronized bounded buffer can be found in bbuffer-mutex.cc. Key differences from the unsynchronized version are highlighted below:

struct bbuffer {
    ...

    std::mutex mutex_;

    ...
};

ssize_t bbuffer::write(const char* buf, size_t sz) {
    this->mutex_.lock();

    ...

    this->mutex_.unlock();
    if (pos == 0 && sz > 0) {
        return -1;  // try again
    } else {
        return pos;
    }
}

ssize_t bbuffer::read(char* buf, size_t sz) {
    this->mutex_.lock();

    ...

    this->mutex_.unlock();
    if (pos == 0 && sz > 0 && !this->write_closed_) {
        return -1;  // try again
    } else {
        return pos;
    }
}

This correctly implements a synchronized bounded buffer. Simply wrapping accesses to shared state within critical sections using a mutex is the easiest and probably also the most common way to make complex in-memory objects synchronized (or thread-safe).

Using one mutex for the entire object is what we called coarse-grained synchronization. It is correct, but it also limits concurrency. Under this scheme, whenever there is a writer writing to the buffer, the reader can't acquire the lock and will have to wait until the writer finishes. The reader and the writer can never truly read and write to the buffer at the same time. We will show in the next lecture that we can achieve more concurrency by using a different synchronization object.