Networking

Networking is the OS abstraction for computers to communicate with one another. We’re all familiar with the Internet.

It's difficult to think about computer networks without thinking about the underlying infrastructure powering them. In the old days of telephony networks, engineers and telephone operators relied on circuit switching to manage connections for telephone calls, meaning that each telephone connection occupies a physical dedicated phone line. Circuit switching was widely used over a long period of time, even during early days of modern digital computing. Circuit switching significantly underutilized resources in that an idle connection (e.g. periods in a phone conversation when neither party was actively saying anything) must also keep a phone line occupied. Extremely complex circuit switching systems were built as telephone systems expanded, but circuit switching itself is inherently not scalable.

Modern computer networks use packet switching such that computers do not rely on dedicated direct connections to communicate. The physical connections between computers are shared, and the network carries individual packets, instead of full connections. The concept of a connection now becomes an abstraction, implemented by layers of software protocols responsible for transmitting/processing packets and presented to the application software as a stream connection by the operating system. Instead of having direct, dedicated connections to computers over the network you want to talk to, modern computer networks look more like the following, where a lot of the physical infrastructure is widely shared among tens of millions of other computers making up the Internet:

internet

Thanks to packet switching and the extensive sharing of the physical infrastructure it enables, Internet becomes cheap enough to be ubiquitous in our lives today.

Packets

A packet is a unit of data sent or received over the network. Computers communicate to one another over the network by sending and receiving packets. Packets have a maximum size, so if a computer wants to send data that does not fit in a single packet, it will have to split the data to multiple packets and send them separately. Each packet contains:

Networking system calls

A networking program uses a set of system calls to send and receive information over the network. The first and foremost system call is called socket().

The returned file descriptor is non-connected -- it has just been initialized but it is neither connected to the network nor backed up by any files or pipes. You can think of socket() as merely reserving kernel state for a future network connection.

Recall how we connect two processes using pipes. There is a parent process which is responsible for setting everything up (calling pipe(), fork(), dup2(), close(), etc.) before the child process gets to run a new program by calling execvp(). This approach clearly doesn't work here, because there is no equivalent of such "parent process" when it comes to completely different computers trying to communicate with one another. Therefore a connection must be set up using a different process with different abstractions.

In network connections, we introduce another pair of abstractions: a client and a server.

Client- and server-sides use different networking system calls.

Client-side system call

Server-side system calls

On the server side things get a bit more complicated. There are 3 system calls:

The server is not ready to accept incoming connections until after calling listen(). It means that before the server calls listen() all incoming connection requests from the clients will fail.

Among all these system calls mentioned above, only connect() and accept() involves actual communication over the network, all other calls simply manipulate local state. So only connect() and accept() system calls can block.

One interesting distinction between pipes are sockets is that pipes are one way, but sockets are two-way: one can only read from the read end of the pipe and write to the write end of the pipe, but one are free to both read and write from a socket. Unlike regular file descriptors for files opened in Read/Write mode, writing to a socket sends data to the network, and reading from the same socket will receive data from the network. Sockets hence represents a two-way connection between the client and the server, they only need to establish one connect to communicate back and forth.

Connections

A connection is an abstraction built on top of raw network packets. It presents an illusion of a reliable data stream between two endpoints of the connection. Connections are set up in phases, again by sending packets.

Here we are describing the Transmission Control Protocol (TCP). There are other networking protocols that do not use the notion of a connection and deals with packets directly. Google "User Datagram Protocol" or simply "UDP" for more information.

A connection is established by what is known as a three-way handshake process. The client initiates the connection request using a network packet, and then the server and the client exchange one round of acknowledgment packets to establish the connection. This process is illustrated below.

tcp 3 way handshake

Once the connection is established, the client and the server can exchange data using the connection. The connection provides an abstraction of a reliable data stream, but at a lower level data are still sent in packets. The networking protocol also performs congestion control: the client would send some data, wait for an acknowledgment from the server, and then send more data, and wait for another acknowledgment. The acknowledgment packets are used by the protocol as indicators of the condition of the network. The the network suffers from high packet loss rate or high delay due to heavy traffic, the protocol will lower the rate at which data are sent to alleviate congestion. The following diagram shows an example of the packet exchange between the client and the server using HTTP over an established connection.

tcp http connection

WeensyDB

We now look at a simple network database, WeensyDB, to see how networking and synchronization are integrated in the same program. A database is a program that stores data. In our case the WeensyDB application has a server side and a client side. The server side is where data is actually stored, and the client simply queries the database over the network and retrieves data from the server.

Version 1: Single-threaded server

The first version of the server-side database is in synch4/weensydb-01.cc. We use a simple hash table to represent our database. The hash table has 1024 buckets, and each bucket is a list of elements to handle collisions.

The handle_connection() function performs most of the server-side logic given a established connection from a client. It reads from the network and parses the command sent by the client. It then processes the command by accessing the database (hash table), and then writes the result of the command back to the network connection so the client can receive it.

We can run this database application by running both the server and the client. Note that merely starting the server does not generate any packets over the network -- it's a purely local operation that simply prepares the listening socket and put it into the appropriate state. After we start the client and type in a command, we do see packets being sent over the network, as a result an active connection being established and data being exchanged.

The example we showed in the lecture had both the client and the server running on the same computer. The packets were exchanged through the local loop-back network interface and nothing gets actually sent over the wire (or WiFi). For understanding you can just imagine that the two programs are running on distinct machines. In fact the two programs do not use anything other than the network to communicate, so it is very much like they operate from different physical machines.

The database server should serve any many clients as possible, without one client being able to interfere with other clients' connections. This is like the process isolation properties provided by the OS kernel. Our first version of WeensyDB doesn't actually provide this property. As we see in its implementation -- it is a single-threaded server. It simply can't handle two client connections concurrently, the clients must wait to be served one by one.

This opens up door for possible attacks. A malicious client who never closes its connection to the server will block all other clients from making progress. synch4/wdbclientloop.cc contains such a bad behaving client, and when we use it with our first version of WeensyDB we observe this effect.

Version 2: Handle connection in a new thread

The next version of the WeensyDB server tries to solve this problem using multi-threading. The program is in synch4/weensydb-02.cc. It's very similar to the previous version, except that it handles a client connection in a new thread. Relevant code is shown below:

int main(int argc, char** argv) {
    ...

    while (true) {
        // Accept connection on listening socket
        int cfd = accept(fd, nullptr, nullptr);
        if (cfd < 0) {
            perror("accept");
            exit(1);
        }

        // Handle connection
        std::thread t(handle_connection, cfd);
        t.detach();
    }
}

This code no longer blocks the main thread while a client connection is being handled, so concurrent client connections can proceed in parallel. Great! Does that really work though?

Now look at how we actually handle the client connection, in the while loop handle_connection() function:

void handle_connection(int cfd) {
    ...
    while (fgets(buf, BUFSIZ, fin)) {
        if ... {
            ...
        } else if (sscanf(buf, "set %s %zu ", key, &sz) == 2) {
            // find item; insert if missing
            auto b = string_hash(key) % NBUCKETS;
            auto it = hfind(hash[b], key);
            if (it == hash[b].end()) {
                it = hash[b].insert(it, hash_item(key));
            }

            // set value
            it->value = std::string(sz, '\0');
            fread(it->value.data(), 1, sz, fin);
            fprintf(f, "STORED %p\r\n", &*it);
            fflush(f);
        } else if ...
    }
    ...
}

We see that while handling a set operation, we modify the underlying hash table, which is shared among all threads created by parallel client connections. And these modifications are not synchronized at all! Indeed if we turn on thread sanitizer we see a lot of complaints indicating serious race conditions in our code when clients issue set commands in parallel.

We have been through this before when dealing with bounded buffers, so we know we need to fix this using a mutex. But we can't just simply protect the entire while loop in question using a scoped lock, as we did for the bounded buffer. If we did that, the program would be properly synchronized, but it loses parallelism -- the mutex ensures that only one connection can be handled at a time, and we fall back to the single-threaded version which is subject to attack by the bad client.

The key to using mutex in a networked program is to realize the danger of holding the mutex while blocking on network system calls. Blocking can happen when sending/receiving data on the network, so during network communications the mutex must never be locked by the server thread.

One way to re-organize the program to avoid these issues is to make sure the mutex is only locked after a complete request has been received from the client, and is unlocked before sending the response. In the weensydb-02.cc code it is equivalent to put one scoped lock within each if/else if/else block in the while loop in handle_connection().

Version 3: Fine-grained locking

By this point we are still doing coarse-grained locking in that the entire hash table is protected by one mutex. In a hash table, it's natural to assign a mutex to each hash table bucket to achieve fine-grained locking, because different hash table buckets are guaranteed to contain different keys. synch4/weensydb-05.cc implements this version.

Version 4: Exchange

Now consider we are to add a new feature to the database: an exchange command that swaps the values of two keys. We still follow the previous fine- grained locking design, but this time we will need to lock up to 2 buckets for every operation. When we lock multiple mutexes in a program we should always be wary of the possibility of deadlocks! The relevant code enabling this command is in weensydb-06.cc, which is also shown below:

        } else if (sscanf(buf, "exch %s %s ", key, key2) == 2) {
            // find item
            auto b1 = string_hash(key) % NBUCKETS;
            auto b2 = string_hash(key2) % NBUCKETS;
            hash_mutex[b1].lock();
            hash_mutex[b2].lock();
            auto it1 = hfind(hash[b1], key);
            auto it2 = hfind(hash[b2], key2);

            // exchange items
            if (it1 != hash[b1].end() && it2 != hash[b2].end()) {
                std::swap(it1->value, it2->value);
                fprintf(f, "EXCHANGED %p %p\r\n", &*it1, &*it2);
            } else {
                fprintf(f, "NOT_FOUND\r\n");
            }
            fflush(f);

            hash_mutex[b1].unlock();
            hash_mutex[b2].unlock();
            
        } else if (remove_trailing_whitespace(buf)) {
            fprintf(f, "ERROR\r\n");
            fflush(f);
        }
    }

The two lines

        hash_mutex[b1].lock();
        hash_mutex[b2].lock();

smell like deadlock. This code deadlocks if the two keys simply hash to the same bucket!

Changes were made in synch4/weensydb-07.cc to address this issue. The two lines above are now replaced by the following (albeit slightly more complex) code:

        hash_mutex[b1].lock();
        if (b1 != b2) {
            hash_mutex[b2].lock();
        }

There is still risk of deadlock as we are not locking the mutexes in a fixed global order. In many cases we can handily represent this global order by something we already keep track of in our code. In this example the bucket numbers b1 and b2 are natural indicators of such a global order. Another commonly used order is based on the memory address values of the mutex objects themselves. synch4/weensydb-08.cc shows a locking order based on hash bucket numbers:

        if (b1 > b2) {
            hash_mutex[b2].lock();
            hash_mutex[b1].lock();
        } else if (b1 == b2) {
            hash_mutex[b1].lock();
        } else {
            hash_mutex[b1].lock();
            hash_mutex[b2].lock();
        }

Servers and the "C10K" problem

"C10K" stands for handling 10,000 connections at a time. Back at the time when the Internet just came into being this is considered a "large-scale" problem. In today's scale the problem is more like "C10M" or "C10B" -- building servers or server architectures that can handle tens of millions (or even billions) of connections at the same time.

Early server software spawns a thread for each incoming connection. This means to handle 10,000 connections at once, the server must run 10,000 threads at the same time. Every thread has its own set of registers (stored in the kernel) and its own stack (takes up user-space memory). Each thread also has a control block (called a task struct in Linux) managed by the kernel. These resources add up to some very significant per-thread space overhead:

10,000 thread stacks alone add up to ~ 80GB of memory! It is clearly not scalable. Is it possible to handle 10,000 connections with just one thread?

It is possible to handle 10,000 mostly idle connections with just one thread. What we need to make sure is that whenever a message comes through in one of these connections, it is timely handled.

Conventional system calls like read() and write() block, so they prevent us from using a single thread to handle concurrent connections, because our single thread will just be blocked on an idle connection. The solution to this problem is to not use blocking: we need non-blocking I/O!

Non-blocking I/O: Handle more than 1 connection per thread

read() and write() system calls can in fact be configured to operate in non-blocking mode. Where a normal read() or write() call would block, the non-blocking version would simply return -1 with errno set to EAGAIN. Let's see how we can use non-blocking I/O to implement a beefy thread that can handle 10,000 connections at once:

int cfd[10000]; // 10K socket fds representing 10K connections

int main(...) {
    ...
    while (true) {
        for (int c : cfd) {
            int r = read(c);
            if (r < 0)
                continue;

            // handle message
            ...

            r = write(c);
            if (r < 0) {
                // queue/buffer response
                continue;
            }
        }
    }
}

This style of programming using non-blocking I/O is also called event-driven programming. Of course, in order to handle this many connections within one thread, we will have to maintain more state in the user level to do bookkeeping for each connection. For example, when we read from a connection and it didn't return a message, we need to somehow mark that we are still awaiting a message from this connection. Likewise, when a write() system calls fails due to EAGAIN while sending a response, the server will need to temporarily store the response message somewhere in memory and queue it for delivery when the network becomes available. These states are not required when we use blocking I/O because the server processes everything in order. A non-blocking server can essentially handle connections out-of-order within a single thread. Nevertheless these user-level overhead is much lower than the overhead of maintaining thousands of threads.

The server program described above has one significant problem: it does polling. Polling means high CPU utilization, which means the server runs hot and consumes a lot of power! It would be ideal if we can block when there is really no messages to be processes, but wake up as soon as something becomes available. The select() system call allows us to do exactly that.

select(): Block on multiple fds to avoid polling

Recall the signature of the select() system call:

select(nfds, rfds, wfds, xfds, timeout)
             ^ read set
                   ^ write set
                         ^ exception set

The select() system call blocks until either the following occurs:

We can solve the problem of polling by calling select() after the inner for loop in the server program above:

int cfd[10000]; // 10K socket fds representing 10K connections

int main(...) {
    ...
    while (true) {
        for (int c : cfd) {
            ...
        }
        select(...);
    }
}

We need to add all fds in the cfd[] array to the read set passed to select(). Now it should solve our problem!

But there is still a lot of overhead in this program. Imagine among the 10,000 connections that are established, most of them are idle and we only have one connection that are active at a time. In order to find that one active connection, the for loop has to search through the entire array. And that's just the overhead in the user space. The select() system call itself has O(N) performance, where N is the number of fds in its read set. For these reasons, this code still results in high CPU usage even most connections are idle. It would be nice if we don't have to do any of this extra work when we just have a single interesting connection to handle.

Our problem here is that select() does work for each connection in its read set on every call. We can solve this problem by maintaining a persistent read set for connections in the kernel, and a blocking system call that returns only active connections when waking up. This is in comparison to the interface of select(), where the read set is passed in as a parameter, and system call API itself is stateless. This API design is the reason why select() must do O(N) work.

The Linux version of this system call is called epoll.

epoll: Maintaining a persistent read set in the kernel

There are three system calls in the epoll system call family:

This is an example of improved performance by having an alternative design of the interface. By moving the read set to be persistently maintained in the kernel, the interface looks drastically different from select albeit achieving the same functionalities. This enables much better per-call performance when the number of active fds is small but the read set itself is large. The nginx web server makes extensive use of the epoll system calls to handle multiple connections per thread.

Here are some latency numbers on select() vs. epoll_wait(), based on 100K calls to a server (in microseconds, lowers are better):

number of fds select() epoll_wait()
10 0.73 0.41
100 3.0 0.42
10000 930 0.66

This O(N) vs. O(1) difference is achieved by an interface change. Identifying issues in interface designs and addressing them by proposing alternative interfaces is one of the goals of systems research.