Synchronization 2: Threads and low-level atomicity
The operating system kernel aims to provide unprivileged processes with safe, useful abstractions for every important hardware resource. Virtual memory abstracts physical memory; signals abstract interrupts; and file descriptors abstract storage devices. A thread is the user-level, unprivileged model for a processor. Just as a modern computer often contains multiple processors operating on the same memory, a process can contain multiple threads operating on the same memory and resources.
Pthreads library
Every process has at least one thread. A Unix process that needs two or
more threads will generally use pthreads, which stands for POSIX
Threads. (POSIX is a Unix
standard.) Gain access to the pthread library by adding #include \<pthread.h\>
to source code and then linking with -lpthread
(that
is, the pthread
l
ibrary) when it comes time to build an
executable.
Threads in a process execute independently, but share data. Thus, they share the same address space, including code and heap; they share kernel resources, such as their file descriptor table; and any thread can access data on any other thread’s stack. To allow threads to execute independently, they have separate register sets—multiple threads in a process can be executing different code at the same time. A special exception is made for stack data. Although all thread stacks are part of the same address space, each thread has a disjoint range of addresses reserved for its stack. This allows different threads to call independent functions and access independent local variables.
System calls used for process management have analogues in terms of threads. Specifically:
Creation
The fork()
system call creates a process. The new process contains a
copy of the old process, and runs the same code.
To create a thread, call pthread_create
. This function has the
following signature:
int pthread_create(pthread_t* threadid, const pthread_attr_t* attr,
void* (*thread_func)(void*), void* thread_arg);
This function, like most pthread functions, returns 0 on success and a
positive error code on failure (e.g., EAGAIN
or EINVAL
).
Note that this differs from system calls, which return -1
on failure
and set errno
to the error code. If the function succeeds, then it
sets \*threadid
to the ID of the new thread.
The fork
and pthread_create
function differ in important ways.
The most significant is that, unlike fork
(which runs the same code
in the parent and the child), pthread_create
takes the address of
the code the thread should run: the new thread starts by calling
thread_func(thread_arg)
, and ends when thread_func
returns. This
makes a lot of sense if you think about the differences between threads
and processes. With fork
, the parent and child process are
distinguished by the return value. This return value is stored in a
register, and quickly thereafter in memory. Since the parent and child’s
address spaces are independent and isolated, there’s no problem: the
same address can contain different data in the two processes. But this
isn’t true of threads! The new thread shares the same address space as
the creating thread, so there’s no way to “return” different values to
the two threads. The pthread creation function thus appears to “return”
only to the creating thread. The created thread runs an entirely new
function.
Exit/termination
The exit(int status)
system call exits a process.
To exit a thread, call pthread_exit(void\* value)
, or return value;
from the thread routine.
Whereas exit
returns an integer status for which the parent process
can wait, the pthread_exit
function returns a pointer value for
which any sibling thread can wait. Why the difference in types? It’s
not meaningful to pass a pointer from one process to another, since
their address spaces are independent. Threads, however, share an address
space, so passing a pointer can be meaningful—and far more powerful than
passing an integer. However, some care is required; just as a function
should not return a pointer to a local variable, a thread should not
exit with a pointer to a local variable, since the thread’s stack is
destroyed when it exits.
Wait
The waitpid(pid_t process, int\* status, int flags)
system call
waits for a child process to complete; it blocks if flags == 0
.
To wait for a thread to complete, call pthread_join(pthread_t thread, void\*\* status)
. Just as with waitpid
, the thread status is
stored in \*status
, but here that status is a pointer.
pthread_join
always blocks.
waitpid
can only be used to wait for a child—and, to avoid zombie
processes, it is typically called for every child. In contrast,
pthread_join
may be called for any sibling thread: parent-child
relationships are not meaningful among threads. The pthread_join
function can only wait for one specific thread named by the thread
argument; there is no equivalent to waitpid(0, ...)
or
waitpid(-pgid, ...)
, which wait for any child in a group of
children. In practice, however, this doesn’t restrict flexibility. Since
threads share memory, memory structures, such as queues or linked lists,
and synchronization objects, such as mutexes, can obtain the same
effect.
Normally every created thread should be joined, or the process will
collect zombie threads. However, a thread can be explicitly detached
using the pthread_detach(pthread_t thread)
function. A detached
thread vanishes completely on exit; there is no need to join it (and, in
fact, it cannot be joined).
Thread and process IDs
The pid_t getpid()
system call returns the current process’s process
ID. pid_t
is always a synonym for int
.
The pthread_t pthread_self()
function returns the current thread’s
thread ID. However, pthread_t
is not necessarily a synonym for
an integer type. On Linux, pthread_t
is the same as unsigned long
, but on Mac OS X, it is a pointer type. In general it is not safe
to print pthread_t
values or even to compare them with ==
; use
the pthread_equal(t1, t2)
function to see if t1
and t2
are
the same thread ID.
Race conditions and increment
We turn now to four varieties of a four-thread increment program. This program starts four threads, each of which increments an unsigned integer value 10,000,000 times. The program waits for the threads and prints the final integer result. Check out synch2/incr.c for the simplest version.
Unsynchronized access to memory and undefined behavior
Running ./incr
generally prints 40000000—but sometimes it prints
30000000!!! Why?
First, the compiler’s optimizer has optimized this loop
unsigned* x = ...;
for (int i = 0; i != 10000000; ++i) {
*x += 1;
}
into the simpler, but equivalent
*x += 10000000;
This is usually what we would want—thank you, compiler! However, for
pedagogical purposes, we want 10,000,000 separate increments. (Parallel
code often wants to access the most up-to-date contents of memory.) We
can achieve this either by turning off optimization, or by using a
special type qualifier, volatile
, which tells the compiler not to
optimize access to \*x
:
volatile unsigned* x = ...;
for (int i = 0; i != 10000000; ++i) {
*x += 1;
}
Running ./incr-volatile
produces dispiriting results:
11921889
10447110
13557412
12406568
13879006
10371929
12800267
11600467
11664183
10562197
Fully 3/4 of the increments are going missing, and we get a different
result every time! (Unoptimized ./incr-noopt
works similarly, but
even worse: we sometimes see results smaller than 10,000,000.)
What’s happening here is that our code invokes undefined behavior. The C abstract machine requires that concurrent access to shared memory be explicitly synchronized. Specifically:
Unsynchronized concurrent access to memory by at least two threads, where at least one of the accesses is a write, causes undefined behavior.
Our accesses are not synchronized, because no C memory accesses are synchronized by default. And many of our accesses are writes. So that means our program invokes undefined behavior, and the compiler has no responsibility to make our code produce the “expected” result. There should be no surprise that the result isn’t 40000000.
Concurrent access to x86-64 memory
Undefined behavior’s purpose is to allow the compiler to produce faster code. We find undefined behavior when strictly well-defined behavior would be expensive to enforce. And that’s the case for concurrent memory accesses on fast x86-64 machines.
These machines have what’s called a relaxed memory model. On a
relaxed memory model machine, when multiple processors access the same
memory at the same time, weird results can occur. For instance, consider
two threads simultaneously executing incl u
, where the integer
global variable u
is initially 0. Under a strict memory model, each
instruction would execute independently, and u
would become 2. But
on x86-64, 1 is allowed too!
Real machines use relaxed memory models because that can make caches much faster. On x86-64 multiprocessor machines, different processors can have independent caches. Those caches must coordinate, and cache coordination is slow, but relaxed memory models allow the coordination to happen less often. The cost is weird results.
For instance, consider two x86-64 processors with independent level-1
caches, each executing incl u
. How might that work? We’d start out
with a picture like this: each processor has an empty L1 cache, and the
u
variable’s current value is in primary physical memory.
| | | | | |
—+——+——+— —+——+——+—
| | | |
—| P1 |— —| P2 |—
| | | |
—+——+——+— —+——+——+—
| | | | | |
+---------+ +---------+
| | | |
+---------+ +---------+
. . . . . . . . . . . . . . . . . . . . . .
. .
. u = 0 .
. .
. . . . . . . . . . . . . . . . . . . . . .
Now, even though incl u
looks like a single unit, the processor
actually executes it in mutiple steps. First, it loads u
’s current
value into the cache; second, it updates the cached value; and third, at
some point thereafter, it writes back the updated cache line to
primary memory. On a uniprocessor, this complexity is hidden from the
programmer. But not on a multiprocessor: different orders of the
substeps have different results.
The steps look like this:
| | | | | |
—+——+——+— incl u: —+——+——+— incl u:
| | P1.1 load u | | P2.1 load u
—| P1 |— to cache —| P2 |— to cache
| | P1.2 update | | P2.2 update
—+——+——+— P1.3 write back —+——+——+— P2.3 write back
| | | | | |
+---------+ +---------+
| | | |
+---------+ +---------+
. . . . . . . . . . . . . . . . . . . . . .
. .
. u = 0 .
. .
. . . . . . . . . . . . . . . . . . . . . .
Let’s say that these steps execute in the order P1.1, P1.2, P1.3, P2.1,
P2.2, P2.3. Then we get the final value 2 for u
. But what about a
different order—say, P1.1, P1.2, P2.1, P2.2, P1.3, P2.3? Then we get 1.
After the first four steps, the picture looks like this:
| | | | | |
—+——+——+— incl u: —+——+——+— incl u:
| | P1.1✓load u | | P2.1✓load u
—| P1 |— to cache —| P2 |— to cache
| | P1.2✓update | | P2.2✓update
—+——+——+— P1.3 write back —+——+——+— P2.3 write back
| | | | | |
+---------+ +---------+
| u = 1 | | u = 1 |
+---------+ +---------+
. . . . . . . . . . . . . . . . . . . . . .
. .
. u = 0 .
. .
. . . . . . . . . . . . . . . . . . . . . .
And eventually u
will get the value 1. Different results with
different scheduling orders: we have a race condition.
Relaxed memory model behavior is not the identical to undefined behavior, but they are related. On concrete x86-64 machines, every set of instructions has defined behavior: undefined behavior is only a concept in the C abstract machine. It’s just that the x86-64 behavior has no easy-to-specify analogue in the abstract machine. (The abstract machine has no concept for “sometimes writes go missing,” and in practice even weirder situations can arise.) When real machine behavior is too hard to model in the abstract machine, and too expensive for compilers to work around, the C standard resorts to undefined behavior as a kind of maximal generalization of every weird thing that could happen.
Atomic increment
Although memory accesses in C are by default unsynchronized, recent
versions of the C standard let us explicitly request synchronization.
For instance, we can declare our u
variable using a special
atomic integer type, _Atomic unsigned u
. Such variables are
accessed using special atomic operations declared in
\<stdatomic.h\>
.
- To initialize an atomic variable, use
_Atomic TYPE var = ATOMIC_VAR_INIT(constant)
oratomic_init(&var, value)
. - To read the variable, use
atomic_load(&var)
. - To write the variable, use
atomic_store(&var, value)
. - To atomically add a value to a variable, use
atomic_fetch_add(&var, value)
. This performs the equivalent ofvar += value
, but in one atomic step. This function call returns the old value ofvar
(the value immediately before the addition). atomic_fetch_sub
,atomic_fetch_and
,atomic_fetch_or
, andatomic_fetch_xor
are the atomic equivalents of-=
,&=
,\|=
, and^=
, respectively.
(We’ll look later at atomic_exchange
and
atomic_compare_exchange
.) The C standard defines these atomic
operations as synchronized accesses, so multiple threads can access the
same memory simultaneously without causing undefined behavior. The
compiler must implement these operations so as to provide the expected
results, such as atomic behavior for atomic_fetch_add
. Effectively,
each atomic operation is its own tiny critical section!
(In class, we used the compiler builtin functions that underlie the
standard functions. These functions have names like
__atomic_fetch_add
, and allow us to operate atomically on a
variable that was declared with a non-atomic type. See the
synch2/incr-atomic.c
file for an example.)
Our loop now looks like this:
_Atomic unsigned* x = ...;
for (int i = 0; i != 10000000; ++i) {
atomic_fetch_add(x, 1);
}
And it works! Running this, we get 40000000 every time (and there’s no undefined behavior). It is slower—roughly 6x slower on my laptop. This is the cost of the strict memory model: the cache manipulation required to enforce strictness, which involves mutual exclusion, is quite expensive. But slow and correct is better than fast and wrong.
Why is it slower, and how did the compiler implement it? If we look at
the instructions, we see that the previous incl (%rdi)
instruction
has changed to:
100000e60: f0 ff 07 lock incl (%rdi)
The compiler has used special x86-64 instructions, called atomic
instructions, that force the processor to use a strict memory model.
Most of these instructions are distinguished by a lock
prefix.
Mutual exclusion: Spinlock
The mutual exclusion lock primitives, lock
and unlock
, are easy
to implement with C-standard atomic accesses. In fact, the C standard
comes with some special types designed exactly for that purpose. Here’s
an example of their use:
atomic_flag lock = ATOMIC_FLAG_INIT;
void spin_lock(atomic_flag* l) {
while (atomic_flag_test_and_set(l)) {
}
}
void spin_unlock(atomic_flag* l) {
atomic_flag_clear(l); // “*l = 0;”
}
ATOMIC_FLAG_INIT
initializes the flag to zero. The
atomic_flag_test_and_set
operation behaves like this:
int atomic_flag_test_and_set(atomic_flag* l) {
// The following two lines execute as ONE ATOMIC STEP:
int old_value = l->value;
l->value = 1;
return old_value;
}
Since this function is in a while loop condition, spin_lock
will
spin until the lock is zero, which means unlocked state. If
spin_lock
detects unlocked state, the loop terminates. However, the
test_and_set
operation not only detects the unlocked state, it also
atomically changes the state to locked. Since the atomic operations
are synchronized, there’s no relaxed memory order behavior: when
multiple threads are trying to lock, only one of them will detect the
unlocked state and exit the loop. These functions provide mutual
exclusion.
This kind of lock is called a spinlock because the lock function spins until it can acquire the lock. Spinning is a form of polling, in that the thread remains runnable as it waits. We generally use the term “spinning” when the process is polling in a tight loop on memory (as opposed to, for example, polling using system calls, or alternating between polling and performing other work).
We can use mutual exclusion primitives to enforce critical sections, and that, in turn, can synchronize other access to memory. Here’s how:
atomic_flag spinlock = ATOMIC_FLAG_INIT;
...
unsigned* x = (unsigned*) arg;
for (int i = 0; i != 10000000; ++i) {
spin_lock(&spinlock);
*x += 1;
spin_unlock(&spinlock);
}
This code (in
incr-spinlock.c
)
produces the correct answer. And it has no undefined behavior, even
though the shared \*x
variable has a normal type and uses normal
operations (rather than an atomic type and special atomic operations).
This is OK because mutual exclusion is provided by the spin_lock
and
spin_unlock
functions, and those functions are implemented using
synchronized accesses. The spinlock ensures that at most one
concurrent thread can access \*x
at a time.
How fast is the spinlock? On my laptop’s Linux VM, it’s about 6x slower than the atomic increment. But it has a critical advantage over atomic increments, namely that any code can be placed inside the critical section enforced by the spinlock, including instructions that have no atomic versions.
Mutual exclusion: Blocking mutex
A mutex (or, for clarity, a blocking mutex) is a form of mutual exclusion lock that blocks when the lock is busy, rather than spins. Implementation-wise, a mutex combines the atomicity of a spinlock with an underlying wait queue, where the wait queue puts a locking thread to sleep until the lock becomes available.
Mutexes are often a good choice for mutual exclusion, and they are the default form of mutual exclusion provided by the pthreads library (and useful in your problem set). Here’s how the pthread variant works:
pthread_mutex_t mutex;
` pthread_mutex_init(&mutex, NULL); // the `NULL` is for an attributes structure `
...
unsigned* x = (unsigned*) arg;
for (int i = 0; i != 10000000; ++i) {
pthread_mutex_lock(&mutex);
*x += 1;
pthread_mutex_unlock(&mutex);
}
Here, the pthread_mutex
operations provide mutual exclusion and
synchronization. Their implementations, though, are built on the same
foundations we’ve used above, namely synchronized memory accesses and
atomic instructions.
On my laptop’s Linux VM, the pthread_mutex
operations perform
faster than the spinlock! (Your mileage may vary.) This isn’t
because the mutex is blocking, it’s because the Linux mutex
implementation is smarter than our simple spinlock implementation (take
a look at the instructions to see how). But on Mac OS X, the mutex
version is 36x slower in real time terms—even though it’s slightly
faster in terms of user CPU (because it spins less)! Blocking doesn’t
aim to be faster, it aims to be more efficient.
Bounded buffer
We now return to our bounded buffer implementation from last lecture. How can we use synchronize access to a multithreaded version of our bounded buffer code?
The bounded buffer application sets up a bounded buffer for
communication between two threads, a writer and a reader. The writer
writes 1,000,000 copies of the 13-character message "Hello world!\n"
to the bounded buffer, then shuts down the write side. The reader copies
the contents of the bounded buffer to standard output, then quits. So if
the bounded buffer implementation were correct, ./bbuffer \| wc -c
would report 13000000 characters, and ./bbuffer \| uniq -c
would
report a single line, Hello world!
, because the output would contain
1,000,000 copies of that line. But that’s not what we see:
$ ./bbuffer | wc -c
19535277
$ ./bbuffer | uniq
Hello world!
Hello wor!
HeHello world!
Hello world!
llo world!
Hello Hello world!
world!
Hello wor!
HeHello Hello world!
world!
Hello worlHello wo world!
Hello world!
...
To make the bounded buffer correct, we need to add synchronization primitives—mutexes. Narrowly speaking, we must avoid undefined behavior, by ensuring that all accesses to shared memory are safe (synchronized). Broadly speaking, we must also ensure that our functions do the right thing.
Unnecessary synchronization
First, is synchronization necessary here?
bbuffer* bbuffer_new(void) {
bbuffer* bb = (bbuffer*) malloc(sizeof(bbuffer));
bb->pos = 0;
bb->len = 0;
bb->write_closed = 0;
return bb;
}
No! The malloc
function returns fresh, unused memory, so no other
thread can access bb-\>pos
or bb-\>len
until the bb
pointer
is published somewhere (in a global or on the heap). (Unless, of course,
the program is invalid and contains use-after-free or wild memory
accesses, in which case all bets are off.) Local variables, and heap
values that are only accessible via local variables, rarely need
synchronization.
Correct synchronization
This code synchronizes correctly, using a per-bounded-buffer mutex.
typedef struct {
char buf[BBUFFER_SIZE];
size_t pos;
size_t len;
int write_closed;
pthread_mutex_t mutex;
} bbuffer;
bbuffer* bbuffer_new(void) {
bbuffer* bb = (bbuffer*) malloc(sizeof(bbuffer));
bb->pos = 0;
bb->len = 0;
bb->write_closed = 0;
pthread_mutex_init(&bb->mutex, NULL);
return bb;
}
ssize_t bbuffer_write(bbuffer* bb, const char* buf, size_t sz) {
size_t pos = 0;
pthread_mutex_lock(&bb->mutex);
assert(!bb->write_closed);
while (pos < sz && bb->len < sizeof(bb->buf)) {
size_t bb_index = (bb->pos + bb->len) % sizeof(bb->buf);
size_t ncopy = sz - pos;
if (ncopy > sizeof(bb->buf) - bb_index) {
ncopy = sizeof(bb->buf) - bb_index;
}
if (ncopy > sizeof(bb->buf) - bb->len) {
ncopy = sizeof(bb->buf) - bb->len;
}
memcpy(&bb->buf[bb_index], &buf[pos], ncopy);
bb->len += ncopy;
pos += ncopy;
}
pthread_mutex_unlock(&bb->mutex);
if (pos == 0 && sz > 0) {
return -1; // try again
} else {
return pos;
}
}
Similar changes are made to the read function.
We chose these critical sections by wrapping all accesses to mutable
shared state. In other words, we took the unsynchronized code; looked
for accesses to shared state that could be modified by synchronous
threads; and expanded those accesses to a contiguous region. (Here, the
whole while loop and the preceding assert
form a contiguous region.
The last if
and return
statements are not part of the region
since they do not access shared state.) Then we put
pthread_mutex_lock
at the start of the region and
pthread_mutex_unlock
at the end. We ensure that all paths into the
region lock the mutex and that all paths out of the region unlock it.
(For instance, if the body of the while loop contained a return
statement, we’d put pthread_mutex_unlock
immediately before it.) And
that is it.
This methodology prevents undefined behavior and produces simple, coarse-grained critical sections. In many, even most, cases, it produces critical sections that are sufficient for correctness.
Thought experiment: Insufficient critical sections
Large critical sections have costs: if the entire program were in one
critical section—if every thread started with
pthread_mutex_lock(&giant_mutex)
and ended with
pthread_mutex_unlock(&giant_mutex)
—then there would be no
parallelism. Smaller critical sections allow more parallelism. But be
careful: make critical sections too small and the program breaks.
For instance, consider this code, which breaks the body of the while loop into two critical sections.
ssize_t bbuffer_write(bbuffer* bb, const char* buf, size_t sz) {
size_t pos = 0;
pthread_mutex_lock(&bb->mutex);
assert(!bb->write_closed);
while (pos < sz && bb->len < sizeof(bb->buf)) {
size_t bb_index = (bb->pos + bb->len) % sizeof(bb->buf);
size_t ncopy = sz - pos;
if (ncopy > sizeof(bb->buf) - bb_index) {
ncopy = sizeof(bb->buf) - bb_index;
}
if (ncopy > sizeof(bb->buf) - bb->len) {
ncopy = sizeof(bb->buf) - bb->len;
}
pthread_mutex_unlock(&bb->mutex);
pthread_mutex_lock(&bb->mutex);
memcpy(&bb->buf[bb_index], &buf[pos], ncopy);
bb->len += ncopy;
pos += ncopy;
}
pthread_mutex_unlock(&bb->mutex);
if (pos == 0 && sz > 0) {
return -1; // try again
} else {
return pos;
}
}
This is incorrect, not because of undefined behavior, but because it
doesn’t perform the right function. The semantics of bbuffer_write
are that characters are written in an atomic unit, and unread characters
are never overwritten. To provide those semantics, the bbuffer_write
function must effectively lock the buffer for its entire run. Imagine
two threads calling the bad bbuffer_write
implementation. The first
one to lock the mutex will calculate bb_index
, the location it will
write its characters later. But then it unlocks the mutex! This allows
other threads to run, possibly lock the mutex, and possibly modify
bb-\>buf
and bb-\>len
. As a result, when the first thread
re-locks the mutex, it might write into a position (bb_index
) that
was calculated using stale information, causing overwritten characters
or gaps in the written data.
Condition variables
Our large critical section makes the bounded buffers correct, but not necessarily efficient. In particular, both write and read poll, rather than block. We can improve efficiency by introducing wait queues, just as we did in the kernel variant.
The pthread synchronization object that encapsulates a wait queue is called a condition variable. Abstractly, a condition variable represents a condition that threads might be interested in waiting for. When a thread wants to wait for the condition to become true, it calls the condition variable’s wait operation. To signal that the condition has become true, a thread calls the condition variable’s signal operation. There’s one important wrinkle, though. Because checking a condition generally requires examining shared state, and accessing shared state usually requires mutual exclusion, condition waiting is integrated with a mutex. This integration avoids subtle race conditions.
A pthread condition variable has type pthread_cond_t
and the
following operation signatures:
-
int pthread_cond_init(pthread_cond_t\* cond, const pthread_condattr_t\* attributes)
-
Initialize the condition variable.
-
int pthread_cond_wait(pthread_cond_t\* cond, pthread_mutex_t\* mutex)
-
Blocks the current thread until the condition
cond
is signaled. The caller must have themutex
locked. Thecond_wait
operation atomically blocks the current process and unlocks the mutex (the atomicity is required to avoid race conditions). Once the condition is signaled, this thread re-locks themutex
and thecond_wait
operation returns. -
int pthread_cond_signal(pthread_cond_t\* cond)
-
Signals (i.e., wakes up) one thread that is blocked on
cond
. If there are no blocked threads, does nothing. -
int pthread_cond_broadcast(pthread_cond_t\* cond)
-
Signals (i.e., wakes up) all threads blocked on
cond
.
Blocking bounded buffer
The bounded buffer implementation has two places that might benefit from
blocking: the write implementation might want to block if the buffer is
full, and the read implementation might want to block if the buffer is
empty. These separate conditions naturally imply two condition
variables. The nonempty
condition variable is signaled when the
buffer becomes nonempty (when it gains characters), and the nonfull
condition variable is signaled when the buffer becomes nonfull (when it
gains space). That is, the write implementation blocks on nonfull
and signals nonempty
, while the read implementation blocks on
nonempty
and signals nonfull
.
Here’s a section of the code:
typedef struct { ...
pthread_cond_t nonempty;
pthread_cond_t nonfull;
} bbuffer;
bbuffer* bbuffer_new(void) { ...
pthread_cond_init(&bb->nonempty, NULL);
pthread_cond_init(&bb->nonfull, NULL);
return bb;
}
ssize_t bbuffer_write(bbuffer* bb, const char* buf, size_t sz) {
size_t pos = 0;
pthread_mutex_lock(&bb->mutex);
assert(!bb->write_closed);
while (pos < sz) {
size_t bb_index = (bb->pos + bb->len) % sizeof(bb->buf);
size_t ncopy = sz - pos;
if (ncopy > sizeof(bb->buf) - bb_index) {
ncopy = sizeof(bb->buf) - bb_index;
}
if (ncopy > sizeof(bb->buf) - bb->len) {
ncopy = sizeof(bb->buf) - bb->len;
}
memcpy(&bb->buf[bb_index], &buf[pos], ncopy);
bb->len += ncopy;
pos += ncopy;
if (ncopy == 0) {
if (pos > 0) {
break;
}
pthread_cond_wait(&bb->nonfull, &bb->mutex);
}
}
pthread_mutex_unlock(&bb->mutex);
if (pos == 0 && sz > 0) {
return -1; // cannot happen
} else {
if (pos > 0) {
pthread_cond_broadcast(&bb->nonempty);
}
return pos;
}
}
Notes
The relaxed memory model on x86-64 machines is called Total Store Order.