2017/Synch2

From CS61
Jump to: navigation, search

Synchronization 2: Threads and low-level atomicity

The operating system kernel aims to provide unprivileged processes with safe, useful abstractions for every important hardware resource. Virtual memory abstracts physical memory; signals abstract interrupts; and file descriptors abstract storage devices. A thread is the user-level, unprivileged model for a processor. Just as a modern computer often contains multiple processors operating on the same memory, a process can contain multiple threads operating on the same memory and resources.

Pthreads library

Every process has at least one thread. A Unix process that needs two or more threads will generally use pthreads, which stands for POSIX Threads. (POSIX is a Unix standard.) Gain access to the pthread library by adding #include <pthread.h> to source code and then linking with -lpthread (that is, the pthread library) when it comes time to build an executable.

Threads in a process execute independently, but share data. Thus, they share the same address space, including code and heap; they share kernel resources, such as their file descriptor table; and any thread can access data on any other thread’s stack. To allow threads to execute independently, they have separate register sets—multiple threads in a process can be executing different code at the same time. A special exception is made for stack data. Although all thread stacks are part of the same address space, each thread has a disjoint range of addresses reserved for its stack. This allows different threads to call independent functions and access independent local variables.

System calls used for process management have analogues in terms of threads. Specifically:

Creation

The fork() system call creates a process. The new process contains a copy of the old process, and runs the same code.</p>

To create a thread, call pthread_create. This function has the following signature:

int pthread_create(pthread_t* threadid, const pthread_attr_t* attr,
       void* (*thread_func)(void*), void* thread_arg);

This function, like most pthread functions, returns 0 on success and a positive error code on failure (e.g., EAGAIN or EINVAL). Note that this differs from system calls, which return -1 on failure and set errno to the error code. If the function succeeds, then it sets *threadid to the ID of the new thread.

The fork and pthread_create function differ in important ways. The most significant is that, unlike fork (which runs the same code in the parent and the child), pthread_create takes the address of the code the thread should run: the new thread starts by calling thread_func(thread_arg), and ends when thread_func returns. This makes a lot of sense if you think about the differences between threads and processes. With fork, the parent and child process are distinguished by the return value. This return value is stored in a register, and quickly thereafter in memory. Since the parent and child’s address spaces are independent and isolated, there’s no problem: the same address can contain different data in the two processes. But this isn’t true of threads! The new thread shares the same address space as the creating thread, so there’s no way to “return” different values to the two threads. The pthread creation function thus appears to “return” only to the creating thread. The created thread runs an entirely new function.

Exit/termination

The exit(int status) system call exits a process.

To exit a thread, call pthread_exit(void* value), or return value; from the thread routine.

Whereas exit returns an integer status for which the parent process can wait, the pthread_exit function returns a pointer value for which any sibling thread can wait. Why the difference in types? It’s not meaningful to pass a pointer from one process to another, since their address spaces are independent. Threads, however, share an address space, so passing a pointer can be meaningful—and far more powerful than passing an integer. However, some care is required; just as a function should not return a pointer to a local variable, a thread should not exit with a pointer to a local variable, since the thread’s stack is destroyed when it exits.

Wait

The waitpid(pid_t process, int* status, int flags) system call waits for a child process to complete; it blocks if flags == 0.

To wait for a thread to complete, call pthread_join(pthread_t thread, void** status). Just as with waitpid, the thread status is stored in *status, but here that status is a pointer. pthread_join always blocks.

waitpid can only be used to wait for a child—and, to avoid zombie processes, it is typically called for every child. In contrast, pthread_join may be called for any sibling thread: parent-child relationships are not meaningful among threads. The pthread_join function can only wait for one specific thread named by the thread argument; there is no equivalent to waitpid(0, ...) or waitpid(-pgid, ...), which wait for any child in a group of children. In practice, however, this doesn’t restrict flexibility. Since threads share memory, memory structures, such as queues or linked lists, and synchronization objects, such as mutexes, can obtain the same effect.

Normally every created thread should be joined, or the process will collect zombie threads. However, a thread can be explicitly detached using the pthread_detach(pthread_t thread) function. A detached thread vanishes completely on exit; there is no need to join it (and, in fact, it cannot be joined).

Thread and process IDs

The pid_t getpid() system call returns the current process’s process ID. pid_t is always a synonym for int.

The pthread_t pthread_self() function returns the current thread’s thread ID. However, pthread_t is not necessarily a synonym for an integer type. On Linux, pthread_t is the same as unsigned long, but on Mac OS X, it is a pointer type. In general it is not safe to print pthread_t values or even to compare them with ==; use the pthread_equal(t1, t2) function to see if t1 and t2 are the same thread ID.

Race conditions and increment

We turn now to four varieties of a four-thread increment program. This program starts four threads, each of which increments an unsigned integer value 10,000,000 times. The program waits for the threads and prints the final integer result. Check out synch2/incr.c for the simplest version.

Unsynchronized access to memory and undefined behavior

Running ./incr generally prints 40000000—but sometimes it prints 30000000!!! Why?

First, the compiler’s optimizer has optimized this loop

unsigned* x = ...;
for (int i = 0; i != 10000000; ++i) {
    *x += 1;
}

into the simpler, but equivalent

*x += 10000000;

This is usually what we would want—thank you, compiler! However, for pedagogical purposes, we want 10,000,000 separate increments. (Parallel code often wants to access the most up-to-date contents of memory.) We can achieve this either by turning off optimization, or by using a special type qualifier, volatile, which tells the compiler not to optimize access to *x:

volatile unsigned* x = ...;
for (int i = 0; i != 10000000; ++i) {
    *x += 1;
}

Running ./incr-volatile produces dispiriting results:

11921889
10447110
13557412
12406568
13879006
10371929
12800267
11600467
11664183
10562197

Fully 3/4 of the increments are going missing, and we get a different result every time! (Unoptimized ./incr-noopt works similarly, but even worse: we sometimes see results smaller than 10,000,000.)

What’s happening here is that our code invokes undefined behavior. The C abstract machine requires that concurrent access to shared memory be explicitly synchronized. Specifically:

Unsynchronized concurrent access to memory by at least two threads, where at least one of the accesses is a write, causes undefined behavior.

Our accesses are not synchronized, because no C memory accesses are synchronized by default. And many of our accesses are writes. So that means our program invokes undefined behavior, and the compiler has no responsibility to make our code produce the “expected” result. There should be no surprise that the result isn’t 40000000.

Concurrent access to x86-64 memory

Undefined behavior’s purpose is to allow the compiler to produce faster code. We find undefined behavior when strictly well-defined behavior would be expensive to enforce. And that’s the case for concurrent memory accesses on fast x86-64 machines.

These machines have what’s called a relaxed memory model. On a relaxed memory model machine, when multiple processors access the same memory at the same time, weird results can occur. For instance, consider two threads simultaneously executing incl u, where the integer global variable u is initially 0. Under a strict memory model, each instruction would execute independently, and u would become 2. But on x86-64, 1 is allowed too!

Real machines use relaxed memory models because that can make caches much faster. On x86-64 multiprocessor machines, different processors can have independent caches. Those caches must coordinate, and cache coordination is slow, but relaxed memory models allow the coordination to happen less often. The cost is weird results.

For instance, consider two x86-64 processors with independent level-1 caches, each executing incl u. How might that work? We’d start out with a picture like this: each processor has an empty L1 cache, and the u variable’s current value is in primary physical memory.

       |  |  |                     |  |  |
      —+——+——+—                   —+——+——+—
       |     |                     |     |
      —|  P1 |—                   —|  P2 |—
       |     |                     |     |
      —+——+——+—                   —+——+——+—
       |  |  |                     |  |  |

     +---------+                 +---------+
     |         |                 |         |
     +---------+                 +---------+

   . . . . . . . . . . . . . . . . . . . . . .
 .                                             .
 .                  u = 0                      .
 .                                             .
   . . . . . . . . . . . . . . . . . . . . . .

Now, even though incl u looks like a single unit, the processor actually executes it in mutiple steps. First, it loads u’s current value into the cache; second, it updates the cached value; and third, at some point thereafter, it writes back the updated cache line to primary memory. On a uniprocessor, this complexity is hidden from the programmer. But not on a multiprocessor: different orders of the substeps have different results.

The steps look like this:

       |  |  |                     |  |  |
      —+——+——+— incl u:           —+——+——+— incl u:
       |     |  P1.1 load u        |     |  P2.1 load u
      —|  P1 |—      to cache     —|  P2 |—      to cache
       |     |  P1.2 update        |     |  P2.2 update
      —+——+——+— P1.3 write back   —+——+——+— P2.3 write back
       |  |  |                     |  |  |

     +---------+                 +---------+
     |         |                 |         |
     +---------+                 +---------+

   . . . . . . . . . . . . . . . . . . . . . .
 .                                             .
 .                  u = 0                      .
 .                                             .
   . . . . . . . . . . . . . . . . . . . . . .

Let’s say that these steps execute in the order P1.1, P1.2, P1.3, P2.1, P2.2, P2.3. Then we get the final value 2 for u. But what about a different order—say, P1.1, P1.2, P2.1, P2.2, P1.3, P2.3? Then we get 1. After the first four steps, the picture looks like this:

       |  |  |                     |  |  |
      —+——+——+— incl u:           —+——+——+— incl u:
       |     |  P1.1✓load u        |     |  P2.1✓load u
      —|  P1 |—      to cache     —|  P2 |—      to cache
       |     |  P1.2✓update        |     |  P2.2✓update
      —+——+——+— P1.3 write back   —+——+——+— P2.3 write back
       |  |  |                     |  |  |

     +---------+                 +---------+
     |  u = 1  |                 |  u = 1  |
     +---------+                 +---------+

   . . . . . . . . . . . . . . . . . . . . . .
 .                                             .
 .                  u = 0                      .
 .                                             .
   . . . . . . . . . . . . . . . . . . . . . .


And eventually u will get the value 1. Different results with different scheduling orders: we have a race condition.

Relaxed memory model behavior is not the identical to undefined behavior, but they are related. On concrete x86-64 machines, every set of instructions has defined behavior: undefined behavior is only a concept in the C abstract machine. It’s just that the x86-64 behavior has no easy-to-specify analogue in the abstract machine. (The abstract machine has no concept for “sometimes writes go missing,” and in practice even weirder situations can arise.) When real machine behavior is too hard to model in the abstract machine, and too expensive for compilers to work around, the C standard resorts to undefined behavior as a kind of maximal generalization of every weird thing that could happen.

Atomic increment

Although memory accesses in C are by default unsynchronized, recent versions of the C standard let us explicitly request synchronization. For instance, we can declare our u variable using a special atomic integer type, _Atomic unsigned u. Such variables are accessed using special atomic operations declared in <stdatomic.h>.

  • To initialize an atomic variable, use _Atomic TYPE var = ATOMIC_VAR_INIT(constant) or atomic_init(&var, value).
  • To read the variable, use atomic_load(&var).
  • To write the variable, use atomic_store(&var, value).
  • To atomically add a value to a variable, use atomic_fetch_add(&var, value). This performs the equivalent of var += value, but in one atomic step. This function call returns the old value of var (the value immediately before the addition).
  • atomic_fetch_sub, atomic_fetch_and, atomic_fetch_or, and atomic_fetch_xor are the atomic equivalents of -=, &=, |=, and ^=, respectively.

(We’ll look later at atomic_exchange and atomic_compare_exchange.) The C standard defines these atomic operations as synchronized accesses, so multiple threads can access the same memory simultaneously without causing undefined behavior. The compiler must implement these operations so as to provide the expected results, such as atomic behavior for atomic_fetch_add. Effectively, each atomic operation is its own tiny critical section!

(In class, we used the compiler builtin functions that underlie the standard functions. These functions have names like __atomic_fetch_add, and allow us to operate atomically on a variable that was declared with a non-atomic type. See the synch2/incr-atomic.c file for an example.)

Our loop now looks like this:

_Atomic unsigned* x = ...;
for (int i = 0; i != 10000000; ++i) {
    atomic_fetch_add(x, 1);
}

And it works! Running this, we get 40000000 every time (and there’s no undefined behavior). It is slower—roughly 6x slower on my laptop. This is the cost of the strict memory model: the cache manipulation required to enforce strictness, which involves mutual exclusion, is quite expensive. But slow and correct is better than fast and wrong.

Why is it slower, and how did the compiler implement it? If we look at the instructions, we see that the previous incl (%rdi) instruction has changed to:

100000e60:   f0 ff 07    lock incl (%rdi)

The compiler has used special x86-64 instructions, called atomic instructions, that force the processor to use a strict memory model. Most of these instructions are distinguished by a lock prefix.

Mutual exclusion: Spinlock

The mutual exclusion lock primitives, lock and unlock, are easy to implement with C-standard atomic accesses. In fact, the C standard comes with some special types designed exactly for that purpose. Here’s an example of their use:

atomic_flag lock = ATOMIC_FLAG_INIT;

void spin_lock(atomic_flag* l) {
    while (atomic_flag_test_and_set(l)) {
    }
}

void spin_unlock(atomic_flag* l) {
    atomic_flag_clear(l);                   // “*l = 0;”
}

ATOMIC_FLAG_INIT initializes the flag to zero. The atomic_flag_test_and_set operation behaves like this:

int atomic_flag_test_and_set(atomic_flag* l) {
    // The following two lines execute as ONE ATOMIC STEP:
    int old_value = l->value;
    l->value = 1;
    return old_value;
}

Since this function is in a while loop condition, spin_lock will spin until the lock is zero, which means unlocked state. If spin_lock detects unlocked state, the loop terminates. However, the test_and_set operation not only detects the unlocked state, it also atomically changes the state to locked. Since the atomic operations are synchronized, there’s no relaxed memory order behavior: when multiple threads are trying to lock, only one of them will detect the unlocked state and exit the loop. These functions provide mutual exclusion.

This kind of lock is called a spinlock because the lock function spins until it can acquire the lock. Spinning is a form of polling, in that the thread remains runnable as it waits. We generally use the term “spinning” when the process is polling in a tight loop on memory (as opposed to, for example, polling using system calls, or alternating between polling and performing other work).

We can use mutual exclusion primitives to enforce critical sections, and that, in turn, can synchronize other access to memory. Here’s how:

atomic_flag spinlock = ATOMIC_FLAG_INIT;
...
unsigned* x = (unsigned*) arg;
for (int i = 0; i != 10000000; ++i) {
    spin_lock(&spinlock);
    *x += 1;
    spin_unlock(&spinlock);
}

This code (in incr-spinlock.c) produces the correct answer. And it has no undefined behavior, even though the shared *x variable has a normal type and uses normal operations (rather than an atomic type and special atomic operations). This is OK because mutual exclusion is provided by the spin_lock and spin_unlock functions, and those functions are implemented using synchronized accesses. The spinlock ensures that at most one concurrent thread can access *x at a time.

How fast is the spinlock? On my laptop’s Linux VM, it’s about 6x slower than the atomic increment. But it has a critical advantage over atomic increments, namely that any code can be placed inside the critical section enforced by the spinlock, including instructions that have no atomic versions.

Mutual exclusion: Blocking mutex

A mutex (or, for clarity, a blocking mutex) is a form of mutual exclusion lock that blocks when the lock is busy, rather than spins. Implementation-wise, a mutex combines the atomicity of a spinlock with an underlying wait queue, where the wait queue puts a locking thread to sleep until the lock becomes available.

Mutexes are often a good choice for mutual exclusion, and they are the default form of mutual exclusion provided by the pthreads library (and useful in your problem set). Here’s how the pthread variant works:

pthread_mutex_t mutex;
pthread_mutex_init(&mutex, NULL);  // the NULL is for an attributes structure
...
unsigned* x = (unsigned*) arg;
for (int i = 0; i != 10000000; ++i) {
    pthread_mutex_lock(&mutex);
    *x += 1;
    pthread_mutex_unlock(&mutex);
}

Here, the pthread_mutex operations provide mutual exclusion and synchronization. Their implementations, though, are built on the same foundations we’ve used above, namely synchronized memory accesses and atomic instructions.

On my laptop’s Linux VM, the pthread_mutex operations perform faster than the spinlock! (Your mileage may vary.) This isn’t because the mutex is blocking, it’s because the Linux mutex implementation is smarter than our simple spinlock implementation (take a look at the instructions to see how). But on Mac OS X, the mutex version is 36x slower in real time terms—even though it’s slightly faster in terms of user CPU (because it spins less)! Blocking doesn’t aim to be faster, it aims to be more efficient.

Bounded buffer

We now return to our bounded buffer implementation from last lecture. How can we use synchronize access to a multithreaded version of our bounded buffer code?

The bounded buffer application sets up a bounded buffer for communication between two threads, a writer and a reader. The writer writes 1,000,000 copies of the 13-character message "Hello world!\n" to the bounded buffer, then shuts down the write side. The reader copies the contents of the bounded buffer to standard output, then quits. So if the bounded buffer implementation were correct, ./bbuffer | wc -c would report 13000000 characters, and ./bbuffer | uniq -c would report a single line, Hello world!, because the output would contain 1,000,000 copies of that line. But that’s not what we see:

$ ./bbuffer | wc -c
 19535277
$ ./bbuffer | uniq
Hello world!
Hello wor!
HeHello world!
Hello world!
llo world!
Hello Hello world!
world!
Hello wor!
HeHello Hello world!
world!
Hello worlHello wo world!
Hello world!
...

To make the bounded buffer correct, we need to add synchronization primitives—mutexes. Narrowly speaking, we must avoid undefined behavior, by ensuring that all accesses to shared memory are safe (synchronized). Broadly speaking, we must also ensure that our functions do the right thing.

Unnecessary synchronization

First, is synchronization necessary here?

bbuffer* bbuffer_new(void) {
    bbuffer* bb = (bbuffer*) malloc(sizeof(bbuffer));
    bb->pos = 0;
    bb->len = 0;
    bb->write_closed = 0;
    return bb;
}

No! The malloc function returns fresh, unused memory, so no other thread can access bb->pos or bb->len until the bb pointer is published somewhere (in a global or on the heap). (Unless, of course, the program is invalid and contains use-after-free or wild memory accesses, in which case all bets are off.) Local variables, and heap values that are only accessible via local variables, rarely need synchronization.

Correct synchronization

This code synchronizes correctly, using a per-bounded-buffer mutex.

typedef struct {
   char buf[BBUFFER_SIZE];
   size_t pos;
   size_t len;
   int write_closed;
   pthread_mutex_t mutex;
} bbuffer;

bbuffer* bbuffer_new(void) {
    bbuffer* bb = (bbuffer*) malloc(sizeof(bbuffer));
    bb->pos = 0;
    bb->len = 0;
    bb->write_closed = 0;
    pthread_mutex_init(&bb->mutex, NULL);
    return bb;
}

ssize_t bbuffer_write(bbuffer* bb, const char* buf, size_t sz) {
    size_t pos = 0;
    pthread_mutex_lock(&bb->mutex);
    assert(!bb->write_closed);
    while (pos < sz && bb->len < sizeof(bb->buf)) {
        size_t bb_index = (bb->pos + bb->len) % sizeof(bb->buf);
        size_t ncopy = sz - pos;
        if (ncopy > sizeof(bb->buf) - bb_index) {
            ncopy = sizeof(bb->buf) - bb_index;
        }
        if (ncopy > sizeof(bb->buf) - bb->len) {
            ncopy = sizeof(bb->buf) - bb->len;
        }
        memcpy(&bb->buf[bb_index], &buf[pos], ncopy);
        bb->len += ncopy;
        pos += ncopy;
    }
    pthread_mutex_unlock(&bb->mutex);
    if (pos == 0 && sz > 0) {
        return -1;  // try again
    } else {
        return pos;
    }
}

Similar changes are made to the read function.

We chose these critical sections by wrapping all accesses to mutable shared state. In other words, we took the unsynchronized code; looked for accesses to shared state that could be modified by synchronous threads; and expanded those accesses to a contiguous region. (Here, the whole while loop and the preceding assert form a contiguous region. The last if and return statements are not part of the region since they do not access shared state.) Then we put pthread_mutex_lock at the start of the region and pthread_mutex_unlock at the end. We ensure that all paths into the region lock the mutex and that all paths out of the region unlock it. (For instance, if the body of the while loop contained a return statement, we’d put pthread_mutex_unlock immediately before it.) And that is it.

This methodology prevents undefined behavior and produces simple, coarse-grained critical sections. In many, even most, cases, it produces critical sections that are sufficient for correctness.

Thought experiment: Insufficient critical sections

Large critical sections have costs: if the entire program were in one critical section—if every thread started with pthread_mutex_lock(&giant_mutex) and ended with pthread_mutex_unlock(&giant_mutex)—then there would be no parallelism. Smaller critical sections allow more parallelism. But be careful: make critical sections too small and the program breaks.

For instance, consider this code, which breaks the body of the while loop into two critical sections.

ssize_t bbuffer_write(bbuffer* bb, const char* buf, size_t sz) {
    size_t pos = 0;
    pthread_mutex_lock(&bb->mutex);
    assert(!bb->write_closed);
    while (pos < sz && bb->len < sizeof(bb->buf)) {
        size_t bb_index = (bb->pos + bb->len) % sizeof(bb->buf);
        size_t ncopy = sz - pos;
        if (ncopy > sizeof(bb->buf) - bb_index) {
            ncopy = sizeof(bb->buf) - bb_index;
        }
        if (ncopy > sizeof(bb->buf) - bb->len) {
            ncopy = sizeof(bb->buf) - bb->len;
        }
        pthread_mutex_unlock(&bb->mutex);

        pthread_mutex_lock(&bb->mutex);
        memcpy(&bb->buf[bb_index], &buf[pos], ncopy);
        bb->len += ncopy;
        pos += ncopy;
    }
    pthread_mutex_unlock(&bb->mutex);
    if (pos == 0 && sz > 0) {
        return -1;  // try again
    } else {
        return pos;
    }
}

This is incorrect, not because of undefined behavior, but because it doesn’t perform the right function. The semantics of bbuffer_write are that characters are written in an atomic unit, and unread characters are never overwritten. To provide those semantics, the bbuffer_write function must effectively lock the buffer for its entire run. Imagine two threads calling the bad bbuffer_write implementation. The first one to lock the mutex will calculate bb_index, the location it will write its characters later. But then it unlocks the mutex! This allows other threads to run, possibly lock the mutex, and possibly modify bb->buf and bb->len. As a result, when the first thread re-locks the mutex, it might write into a position (bb_index) that was calculated using stale information, causing overwritten characters or gaps in the written data.

Condition variables

Our large critical section makes the bounded buffers correct, but not necessarily efficient. In particular, both write and read poll, rather than block. We can improve efficiency by introducing wait queues, just as we did in the kernel variant.

The pthread synchronization object that encapsulates a wait queue is called a condition variable. Abstractly, a condition variable represents a condition that threads might be interested in waiting for. When a thread wants to wait for the condition to become true, it calls the condition variable’s wait operation. To signal that the condition has become true, a thread calls the condition variable’s signal operation. There’s one important wrinkle, though. Because checking a condition generally requires examining shared state, and accessing shared state usually requires mutual exclusion, condition waiting is integrated with a mutex. This integration avoids subtle race conditions.

A pthread condition variable has type pthread_cond_t and the following operation signatures:

int pthread_cond_init(pthread_cond_t* cond, const pthread_condattr_t* attributes)
Initialize the condition variable.
int pthread_cond_wait(pthread_cond_t* cond, pthread_mutex_t* mutex)
Blocks the current thread until the condition cond is signaled. The caller must have the mutex locked. The cond_wait operation atomically blocks the current process and unlocks the mutex (the atomicity is required to avoid race conditions). Once the condition is signaled, this thread re-locks the mutex and the cond_wait operation returns.
int pthread_cond_signal(pthread_cond_t* cond)
Signals (i.e., wakes up) one thread that is blocked on cond. If there are no blocked threads, does nothing.
int pthread_cond_broadcast(pthread_cond_t* cond)
Signals (i.e., wakes up) all threads blocked on cond.

Blocking bounded buffer

The bounded buffer implementation has two places that might benefit from blocking: the write implementation might want to block if the buffer is full, and the read implementation might want to block if the buffer is empty. These separate conditions naturally imply two condition variables. The nonempty condition variable is signaled when the buffer becomes nonempty (when it gains characters), and the nonfull condition variable is signaled when the buffer becomes nonfull (when it gains space). That is, the write implementation blocks on nonfull and signals nonempty, while the read implementation blocks on nonempty and signals nonfull.

Here’s a section of the code:

typedef struct { ...
    pthread_cond_t nonempty;
    pthread_cond_t nonfull;
} bbuffer;

bbuffer* bbuffer_new(void) { ...
    pthread_cond_init(&bb->nonempty, NULL);
    pthread_cond_init(&bb->nonfull, NULL);
    return bb;
}

ssize_t bbuffer_write(bbuffer* bb, const char* buf, size_t sz) {
    size_t pos = 0;
    pthread_mutex_lock(&bb->mutex);
    assert(!bb->write_closed);
    while (pos < sz) {
        size_t bb_index = (bb->pos + bb->len) % sizeof(bb->buf);
        size_t ncopy = sz - pos;
        if (ncopy > sizeof(bb->buf) - bb_index) {
            ncopy = sizeof(bb->buf) - bb_index;
        }
        if (ncopy > sizeof(bb->buf) - bb->len) {
            ncopy = sizeof(bb->buf) - bb->len;
        }
        memcpy(&bb->buf[bb_index], &buf[pos], ncopy);
        bb->len += ncopy;
        pos += ncopy;
        if (ncopy == 0) {
            if (pos > 0) {
                break;
            }
            pthread_cond_wait(&bb->nonfull, &bb->mutex);
        }
    }
    pthread_mutex_unlock(&bb->mutex);
    if (pos == 0 && sz > 0) {
        return -1;  // cannot happen
    } else {
        if (pos > 0) {
            pthread_cond_broadcast(&bb->nonempty);
        }
        return pos;
    }
}

All the code

Notes

The relaxed memory model on x86-64 machines is called Total Store Order.