I was reading this article by Marek Majkowski recently and a few terms went over my head. So had fun digging and yak-shaving and this blog post is the result of it. My attempt here is to collect enough arms in one place so as to tackle that post with ease. Let’s go…

I/O Models

The problem we are trying to solve is having multiple I/O jobs to juggle at the same time. How do we know which file descriptor is ready to read, write, or errored out?

Sure, we can go in a loop and check if something is ready or not (which is non-blocking). Or infinitely wait till one is ready and forget the rest till this fd is ready (blocking).

Blocking I/O

Unix Network Programming Fig 6.1

Unix Network Programming Fig 6.1

Non Blocking I/O

Unix Network Programming Fig 6.2

Unix Network Programming Fig 6.2

But there are more I/O models possible out there:

I/O Multiplexing Model

Unix Network Programming Fig 6.3

Unix Network Programming Fig 6.3

As we can see, in I/O multiplexing the process blocks on a method call (like select/poll/epoll) and the kernel wakes up the blocking process when some I/O is ready. The process gets the file descriptors that are ready to be read, written, or exception.

Signal-Driven I/O Model

Unix Network Programming Fig 6.4

Unix Network Programming Fig 6.4

The signal-driven I/O model uses signals, telling the kernel to notify us with the SIGIO signal when the descriptor is ready.

Asynchronous I/O Model

Unix Network Programming Fig 6.5

Unix Network Programming Fig 6.5

I/O Multiplexing

select(2)

Let’s start by looking at the simplest and earliest I/O Multiplexing model, select

man select

man select

Here’s a simple TCP server using select:

import socket
import select
import os

SERVER_HOST = 'localhost'
SERVER_PORT = 8080
MAX_CLIENTS = 10

def child_process(client_sockets, master_socket, id):
    
    while True:
        read_sockets, _, _ = select.select(client_sockets, [], [])
        
        for sock in read_sockets:
            if sock == master_socket:
                try:
                    client_socket, addr = master_socket.accept()
                    client_socket.setblocking(False)
                    print(f"New conection {addr} in process {id}")
                    client_sockets.append(client_socket)
                except:
                    break
            else:
                data = sock.recv(1024)
                if not data:
                    sock.close()
                    client_sockets.remove(sock)
                else:
                    sock.send(data)
                    
if __name__ == '__main__':

    master_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    master_socket.bind((SERVER_HOST, SERVER_PORT))
    master_socket.listen(MAX_CLIENTS)
    master_socket.setblocking(False)
    
    client_sockets = [master_socket]
    
    for i in range(100):
        pid = os.fork()
        if pid == 0:
            child_process(client_sockets, master_socket, i)
            
    os.wait()

select has a few issues though:

epoll(7)

man 7 epoll

man 7 epoll

Scenario:

(1)  The file descriptor that represents the read side of a pipe
     (rfd - read file descr.) is registered on the epoll instance.

(2)  A pipe writer writes 2 kB of data on the write side of the
     pipe.

(3)  A call to epoll_wait(2) is done that will return rfd as a
     ready file descriptor.

(4)  The pipe reader reads 1 kB of data from rfd.
(5)  A call to epoll_wait(2) is done.

Here’s the same sample with epoll in python (Exercise: Do this in C to understand the specification more in depth)


import socket
import select
import os

# To avoid thundering-herd problem. This requires kernel 4.5+.
EPOLLEXCLUSIVE = 1<<28

def child_process(epoll, sock, id):
    while True:
        try:
            epoll.poll()
        except IOError:
            continue
        while True:
            try:
                cd, _ = sock.accept()
                print(f"worker {id}")
            except socket.error:
                break
            except Exception as e:
                print(f"Some other exp {e}")
                break
        cd.close()

if __name__ == '__main__':

    # Create and bind socket
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.bind(('127.0.0.1', 8080))
    sock.listen(10)
    sock.setblocking(False)


    # Fork child processes
    for i in range(3):
        pid = os.fork()
        if pid == 0:
            # Create epoll instance
            epoll = select.epoll()
            epoll.register(sock, select.EPOLLIN | EPOLLEXCLUSIVE)
            child_process(epoll, sock, i)

    os.wait()

Closing Notes: