Recall that in the last lecture we showed that updating shared memory
variables in concurrent threads requires synchronization, otherwise some
update may get lost because of the interleaving of operations in different
threads. Synchronized updates can be achieved by using the C++ std::atomic
template library, which automatically translates the addl
instruction used
to perform the update into a lock
-prefixed instruction that behaves
atomically.
C++'s std::atomic
library is powerful (and also great progress in
standardization of atomic operations), but it works only on integers that are
word-sized or below. To synchronize more complex objects in memory or perform
more complex synchronization tasks, we need abstractions called
synchronization objects.
Synchronization objects are types whose methods can be used to achieve
synchronization and atomicity on normal (non-std::atomic
-wrapped) objects.
Synchronization objects provides various abstract properties that simplify
programming multi-threaded access to shared data. The most common
synchronization object is called a mutex, which provides the mutual
exclusion property.
Mutual exclusion
Mutual exclusion means that at most one thread accesses the shared data at a time.
In our multi-threaded incr-basic.cc
example from the last lecture, the code
does not work because more than one thread can access the shared variable at a
time. The code would behave correctly if the mutual exclusion policy is
enforced. We can use a mutex
object to enforce mutual exclusion
(incr-mutex.cc
). In this example it has the same effect as wrapping *x
in a std::atomic
template:
std::mutex mutex;
void threadfunc(unsigned* x) {
for (int i = 0; i != 10000000; ++i) {
mutex.lock();
*x += 1;
mutex.unlock();
}
}
The mutex has an internal state (denoted by state
), which can be either
locked or unlocked. The semantics of a mutex object is as follows:
- Upon initialization,
state = unlocked
. mutex::lock()
method: waits untilstate
becomesunlocked
, and then atomically setsstate = locked
. Note the two steps shall complete in one atomic operation.mutex::unlock()
method: asserts thatstate == locked
, then setsstate = unlocked
.
The mutual exclusion policy is enforced in the code region between the
lock()
and unlock()
invocations. We call this region the critical
region.
Implementing a mutex
Let's now think about how we may implement such a mutex object.
Does the following work?
struct mutex {
static constexpr int unlocked = 0;
static constexpr int locked = 1;
int state = unlocked;
void lock() {
while (state == locked) {}
state = locked;
}
void unlock() {
state = unlocked;
}
};
No! Imagine that two threads calls lock()
on an unlocked mutex at the same
time. They both observe that state == unlocked
, skip the while loop, and set state
= locked
, and then return. Now both threads think they have the lock and proceeds
to the critical region, all at the same time! Not Good!
In order to properly implement a mutex we need atomic read-modify-write
operation. std::atomic
provides these operations in the form of
operator++
and operator--
operators. The following mutex implementation
should be correct:
struct mutex {
std::atomic<int> state = 0;
void lock() {
while (++state != 1) {
--state;
}
}
unlock() {
--state;
}
};
Note that the unlock()
method performs an atomic decrement operation,
instead of simply writing 0. Simply storing 0 to the mutex state is incorrect
because of if this store occurred between the ++state
and --state
steps in
the while
loop in the lock()
method, state
becomes negative and the
while
loop can never exit.
With the help of mutex, we can build objects that support synchronized accesses with more complex semantics.
Bounded buffers
A bounded buffer is a synchronized object that supports the following operations:
read(buf, n)
: reads up ton
chars from the bounded buffer tobuf
;write(buf, n)
: writes up ton
chars into the bounded buffer frombuf
.
Bounded buffer is the abstraction used to implement pipes and non-seekable stdio caches.
Unsynchronized buffer
Let's first look at a bounded buffer implementation bbuffer-basic.cc
, which does
not perform any synchronization.
struct bbuffer {
static constexpr size_t bcapacity = 128;
char bbuf_[bcapacity];
size_t bpos_ = 0;
size_t blen_ = 0;
bool write_closed_ = false;
ssize_t read(char* buf, size_t sz);
ssize_t write(const char* buf, size_t sz);
void shutdown_write();
};
ssize_t bbuffer::write(const char* buf, size_t sz) {
assert(!this->write_closed_);
size_t pos = 0;
while (pos < sz && this->blen_ < bcapacity) {
size_t bindex = (this->bpos_ + this->blen_) % bcapacity;
size_t bspace = std::min(bcapacity - bindex, bcapacity - this->blen_);
size_t n = std::min(sz - pos, bspace);
memcpy(&this->bbuf_[bindex], &buf[pos], n);
this->blen_ += n;
pos += n;
}
if (pos == 0 && sz > 0) {
return -1; // try again
} else {
return pos;
}
}
ssize_t bbuffer::read(char* buf, size_t sz) {
size_t pos = 0;
while (pos < sz && this->blen_ > 0) {
size_t bspace = std::min(this->blen_, bcapacity - this->bpos_);
size_t n = std::min(sz - pos, bspace);
memcpy(&buf[pos], &this->bbuf_[this->bpos_], n);
this->bpos_ = (this->bpos_ + n) % bcapacity;
this->blen_ -= n;
pos += n;
}
if (pos == 0 && sz > 0 && !this->write_closed_) {
return -1; // try again
} else {
return pos;
}
}
It implements what we call circular buffer. Every time we read a character
out of the buffer, we increment bpos_
, which is the index of the next
character to be read in the buffer. Whenever we write to the buffer, we
increment blen_
, which is the number of bytes currently occupied in the
buffer. The buffer is circular, which is why all index arithmetic is modulo
the total capacity of the buffer.
When there is just one thread accessing the buffer, it works perfectly fine.
But does it work when multiple threads are using the buffer at the same time?
In our test program in bbuffer-basic.cc
, we have one reader thread reading
from the buffer and a second writer thread writing to the buffer. We would
expect everything written to the buffer by the writer thread to show up
exactly as it was written once read out by the reader thread.
It does not work, because there is no synchronization over the internal state
of the bounded buffer. bbuffer::read()
and bbuffer::write()
both modify
internal state of the bbuffer
object (most critically bpos_
and blen_
),
and such accesses require synchronization to work correctly in a multi-
threaded environment. One way to fix this is to wrap the function bodies of
the read()
and write()
methods in critical regions, using a mutex.
A correct version of a synchronized bounded buffer can be found in
bbuffer-mutex.cc
. Key differences from the unsynchronized version are
highlighted below:
struct bbuffer {
...
std::mutex mutex_;
...
};
ssize_t bbuffer::write(const char* buf, size_t sz) {
this->mutex_.lock();
...
this->mutex_.unlock();
if (pos == 0 && sz > 0) {
return -1; // try again
} else {
return pos;
}
}
ssize_t bbuffer::read(char* buf, size_t sz) {
this->mutex_.lock();
...
this->mutex_.unlock();
if (pos == 0 && sz > 0 && !this->write_closed_) {
return -1; // try again
} else {
return pos;
}
}
This correctly implements a synchronized bounded buffer. Simply wrapping accesses to shared state within critical sections using a mutex is the easiest and probably also the most common way to make complex in-memory objects synchronized (or thread-safe).
Using one mutex for the entire object is what we called coarse-grained synchronization. It is correct, but it also limits concurrency. Under this scheme, whenever there is a writer writing to the buffer, the reader can't acquire the lock and will have to wait until the writer finishes. The reader and the writer can never truly read and write to the buffer at the same time. We will show in the next lecture that we can achieve more concurrency by using a different synchronization object.