fork, select, poll, epoll, io_uringのecho server
fork, select, poll, epoll, io_uringなどを使用してそれぞれecho serverを実装したのでそれぞれの仕様などを忘備録としてまとめていきます。エラー処理とか雑な部分があると思いますがご容赦ください。
環境構築
macOS上での実行を前提にします。io_uringを使う場合にはlinux kernel 5.1以上でないと動かないので、multipassというubuntuのVMを手軽に作成できるツールを使って実行環境を構築します。
multipassのインストール
brew install --cask multipass
20.10のubuntuを用意して、gccやio_uringのライブラリであるliburingを入れます。
multipass launch 20.10 -n primary multipass shell # login sudo apt update -y sudo apt install gcc liburing -y uname -a > Linux primary 5.8.0-43-generic #49-Ubuntu SMP Fri Feb 5 03:01:28 UTC 2021 x86_64 x86_64 x86_64 GNU/Linux
fork
forkを使用したサーバでは、1クライアントに1プロセスが対応することになり、メモリが枯渇して性能に余裕があるのにもかかわらずレスポンス性能が大きく落ちます。
有名なC10K問題を引き起こします。
C10K問題(英語: C10K problem)とは、Apache HTTP ServerなどのWebサーバソフトウェアとクライアントの通信において、クライアントが約1万台に達すると、Webサーバーのハードウェア性能に余裕があるにも関わらず、レスポンス性能が大きく下がる問題である。
#include <arpa/inet.h> #include <signal.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <sys/socket.h> #include <unistd.h> #define BACKLOG_SIZE 5 #define BUF_SIZE 1024 #define INF_TIME -1 #define DISABLE -1 int listen_fd; void int_handle(int n) { close(listen_fd); exit(EXIT_SUCCESS); } // wirte n byte ssize_t write_n(int fd, char *ptr, size_t n) { ssize_t n_left = n, n_written; while (n_left > 0) { if ((n_written = write(fd, ptr, n_left)) <= 0) { return n_written; } n_left -= n_written; ptr += n_written; } return EXIT_SUCCESS; } int main(int argc, char **argv) { // Create listen socket if ((listen_fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { fprintf(stderr, "Error: socket\n"); return EXIT_FAILURE; } // TCP port number int port = 8080; // Initialize server socket address struct sockaddr_in server_addr; memset(&server_addr, 0, sizeof(server_addr)); server_addr.sin_family = AF_INET; server_addr.sin_addr.s_addr = INADDR_ANY; server_addr.sin_port = htons(port); // Bind socket to an address if (bind(listen_fd, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) { fprintf(stderr, "Error: bind\n"); return EXIT_FAILURE; } // Listen if (listen(listen_fd, BACKLOG_SIZE) < 0) { fprintf(stderr, "Error: listen\n"); return EXIT_FAILURE; } // Set INT signal handler signal(SIGINT, int_handle); fprintf(stderr, "listen on port %d\n", port); while (1) { // Check new connection struct sockaddr_in client_addr; socklen_t len_client = sizeof(client_addr); int conn_fd; if ((conn_fd = accept(listen_fd, (struct sockaddr *)&client_addr, &len_client)) < 0) { fprintf(stderr, "Error: accept\n"); return EXIT_FAILURE; } printf("Accept socket %d (%s : %hu)\n", conn_fd, inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port)); pid_t pid = fork(); if (pid < 0) { fprintf(stderr, "Error: fork\n"); return EXIT_FAILURE; } if (pid == 0) { // child char buf[BUF_SIZE]; close(listen_fd); while (1) { ssize_t n = read(conn_fd, buf, BUF_SIZE); if (n < 0) { fprintf(stderr, "Error: read from socket %d\n", conn_fd); close(conn_fd); exit(-1); } else if (n == 0) { // connection closed by client printf("Close socket %d\n", conn_fd); close(conn_fd); exit(0); } else { printf("Read %zu bytes from socket %d\n", n, conn_fd); write_n(conn_fd, buf, n); } } } else { // parent close(conn_fd); } } close(listen_fd); return EXIT_SUCCESS; }
I/O多重化
上記のようなマルチプロセスにより起きるC10K問題を解決する方法として、クライアント数にかかわらず1プロセスで処理するイベント駆動型プログラミングが提案されています。
イベント駆動型プログラミング(イベントくどうがたプログラミング、英: event-driven programming)は、コンピュータプログラムが起動すると共にイベントを待機し、発生したイベントに従って受動的に処理を行うプログラミングパラダイムのこと。
以降はI/O多重化によるecho serverを実装していきます。I/O多重化とは複数のI/Oデバイスの状態を同時に監視する仕組みです。例えば1プロセスで複数のクライアントを制御するためにこの技術は使用されます。
select
ディスクリプタを線形に探索する必要があるので計算量がO(n)かかります。管理できるディスクリプタ数に上限があるのが特徴です。
#include <arpa/inet.h> #include <errno.h> #include <signal.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <sys/select.h> #include <sys/socket.h> #include <unistd.h> #define BACKLOG_SIZE 5 #define BUF_SIZE 1024 #define N_CLIENT 256 #define INF_TIME -1 #define DISABLE -1 int listen_fd; void int_handle(int n) { close(listen_fd); exit(EXIT_SUCCESS); } // wirte n byte ssize_t write_n(int fd, char *ptr, size_t n) { ssize_t n_left = n, n_written; while (n_left > 0) { if ((n_written = write(fd, ptr, n_left)) <= 0) { return n_written; } n_left -= n_written; ptr += n_written; } return EXIT_SUCCESS; } int main(int argc, char **argv) { char buf[BUF_SIZE]; fd_set fds; FD_ZERO(&fds); int clients[N_CLIENT]; for (int i = 0; i < N_CLIENT; i++) { clients[i] = DISABLE; } memset(&fds, 0, sizeof(fds)); // Create listen socket if ((listen_fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { fprintf(stderr, "Error: socket\n"); return EXIT_FAILURE; } // Set INT signal handler signal(SIGINT, int_handle); // TCP port number int port = 8080; // Initialize server socket address struct sockaddr_in server_addr; memset(&server_addr, 0, sizeof(server_addr)); server_addr.sin_family = AF_INET; server_addr.sin_addr.s_addr = INADDR_ANY; server_addr.sin_port = htons(port); // Bind socket to an address if (bind(listen_fd, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) { fprintf(stderr, "Error: bind\n"); return EXIT_FAILURE; } // Listen if (listen(listen_fd, BACKLOG_SIZE) < 0) { fprintf(stderr, "Error: listen\n"); return EXIT_FAILURE; } fprintf(stderr, "listen on port %d\n", port); FD_SET(listen_fd, &fds); int max_fd = listen_fd; // max fd int max_i = 0; // max client into clients[] array while (1) { FD_ZERO(&fds); FD_SET(listen_fd, &fds); for (int i = 0; i < N_CLIENT; i++) { if (clients[i] != DISABLE) { FD_SET(clients[i], &fds); } } int res_select = select(max_fd + 1, &fds, NULL, NULL, NULL); if (res_select < 0) { fprintf(stderr, "Error: select"); return EXIT_FAILURE; } // Check new connection if (FD_ISSET(listen_fd, &fds)) { struct sockaddr_in client_addr; socklen_t len_client = sizeof(client_addr); int connfd; if ((connfd = accept(listen_fd, (struct sockaddr *)&client_addr, &len_client)) < 0) { fprintf(stderr, "Error: accept\n"); return EXIT_FAILURE; } printf("Accept socket %d (%s : %hu)\n", connfd, inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port)); // Save client socket into clients array int i; for (i = 0; i < N_CLIENT; i++) { if (clients[i] == DISABLE) { clients[i] = connfd; break; } } // No enough space in clients array if (i == N_CLIENT) { fprintf(stderr, "Error: too many clients\n"); close(connfd); } if (i > max_i) { max_i = i; } if (connfd > max_fd) { max_fd = connfd; } } // Check all clients to read data for (int i = 0; i <= max_i; i++) { int sock_fd; if ((sock_fd = clients[i]) == DISABLE) { continue; } // If the client is readable or errors occur ssize_t n = read(sock_fd, buf, BUF_SIZE); if (n < 0) { fprintf(stderr, "Error: read from socket %d\n", sock_fd); close(sock_fd); clients[i] = DISABLE; } else if (n == 0) { // connection closed by client printf("Close socket %d\n", sock_fd); close(sock_fd); clients[i] = DISABLE; } else { printf("Read %zu bytes from socket %d\n", n, sock_fd); write_n(sock_fd, buf, n); write_n(1, buf, n); } } } close(listen_fd); return EXIT_SUCCESS; }
poll
selectとほとんど同じ機能であり、計算量も同じO(n)だけかかります。ただし、管理するディスクリプタの制限がない点で上位互換と捉えることができます。
#include <arpa/inet.h> #include <errno.h> #include <poll.h> #include <signal.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <sys/socket.h> #include <unistd.h> #define BACKLOG_SIZE 5 #define BUF_SIZE 1024 #define N_CLIENT 256 #define INF_TIME -1 #define DISABLE -1 int listen_fd; void int_handle(int n) { close(listen_fd); exit(EXIT_SUCCESS); } // wirte n byte ssize_t write_n(int fd, char *ptr, size_t n) { ssize_t n_left = n, n_written; while (n_left > 0) { if ((n_written = write(fd, ptr, n_left)) <= 0) { return n_written; } n_left -= n_written; ptr += n_written; } return EXIT_SUCCESS; } int main(int argc, char **argv) { char buf[BUF_SIZE]; struct pollfd clients[N_CLIENT]; // Create listen socket if ((listen_fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { fprintf(stderr, "Error: socket\n"); return EXIT_FAILURE; } // TCP port number int port = 8080; // Initialize server socket address struct sockaddr_in server_addr; memset(&server_addr, 0, sizeof(server_addr)); server_addr.sin_family = AF_INET; server_addr.sin_addr.s_addr = INADDR_ANY; server_addr.sin_port = htons(port); // Bind socket to an address if (bind(listen_fd, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) { fprintf(stderr, "Error: bind\n"); return EXIT_FAILURE; } // Listen if (listen(listen_fd, BACKLOG_SIZE) < 0) { fprintf(stderr, "Error: listen\n"); return EXIT_FAILURE; } // Set INT signal handler signal(SIGINT, int_handle); fprintf(stderr, "listen on port %d\n", port); clients[0].fd = listen_fd; clients[0].events = POLLIN; for (int i = 1; i < N_CLIENT; i++) { clients[i].fd = DISABLE; } int max_i = 0; // max index into clients[] array while (1) { int n_ready = poll(clients, max_i + 1, INF_TIME); // Time out if (n_ready == 0) { continue; } // Error poll if (n_ready < 0) { fprintf(stderr, "Error: poll %d\n", errno); return errno; } // Check new connection if (clients[0].revents & POLLIN) { struct sockaddr_in client_addr; socklen_t len_client = sizeof(client_addr); int connfd; if ((connfd = accept(listen_fd, (struct sockaddr *)&client_addr, &len_client)) < 0) { fprintf(stderr, "Error: accept\n"); return EXIT_FAILURE; } printf("Accept socket %d (%s : %hu)\n", connfd, inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port)); // Save client socket into clients array int i; for (i = 0; i < N_CLIENT; i++) { if (clients[i].fd == DISABLE) { clients[i].fd = connfd; break; } } // No enough space in clients array if (i == N_CLIENT) { fprintf(stderr, "Error: too many clients\n"); close(connfd); } clients[i].events = POLLIN; if (i > max_i) { max_i = i; } } // Check all clients to read data for (int i = 1; i <= max_i; i++) { int sock_fd; if ((sock_fd = clients[i].fd) == DISABLE) { continue; } // If the client is readable or errors occur if (clients[i].revents & (POLLIN | POLLERR)) { ssize_t n = read(sock_fd, buf, BUF_SIZE); if (n < 0) { fprintf(stderr, "Error: read from socket %d\n", sock_fd); close(sock_fd); clients[i].fd = DISABLE; } else if (n == 0) { // connection closed by client printf("Close socket %d\n", sock_fd); close(sock_fd); clients[i].fd = DISABLE; } else { printf("Read %zu bytes from socket %d\n", n, sock_fd); write_n(sock_fd, buf, n); write_n(1, buf, n); } } } } close(listen_fd); return EXIT_SUCCESS; }
epoll
linux kernel 2.6以降であれば使用可能なAPIです。
ディスクリプタの状態をカーネル空間で監視しているため、直接変更のあるディスクリプタを返してくれるので計算量がO(1)となり高速に処理できます。
ただし、ディスクリプタが1の場合はepollにオーバーヘッドが存在して、pollの方が早い場合もあるので注意が必要です。
#include <arpa/inet.h> #include <errno.h> #include <signal.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <sys/epoll.h> #include <sys/socket.h> #include <unistd.h> #define BACKLOG_SIZE 5 #define BUF_SIZE 1024 #define N_CLIENT 256 #define INF_TIME -1 #define DISABLE -1 int listen_fd; void int_handle(int n) { close(listen_fd); exit(EXIT_SUCCESS); } // wirte n byte ssize_t write_n(int fd, char *ptr, size_t n) { ssize_t n_left = n, n_written; while (n_left > 0) { if ((n_written = write(fd, ptr, n_left)) <= 0) { return n_written; } n_left -= n_written; ptr += n_written; } return EXIT_SUCCESS; } int main(int argc, char **argv) { char buf[BUF_SIZE]; // Create listen socket if ((listen_fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { fprintf(stderr, "Error: socket\n"); return EXIT_FAILURE; } // TCP port number int port = 8080; // Initialize server socket address struct sockaddr_in server_addr; memset(&server_addr, 0, sizeof(server_addr)); server_addr.sin_family = AF_INET; server_addr.sin_addr.s_addr = INADDR_ANY; server_addr.sin_port = htons(port); // Bind socket to an address if (bind(listen_fd, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) { fprintf(stderr, "Error: bind\n"); return EXIT_FAILURE; } // Listen if (listen(listen_fd, BACKLOG_SIZE) < 0) { fprintf(stderr, "Error: listen\n"); return EXIT_FAILURE; } // Set INT signal handler signal(SIGINT, int_handle); fprintf(stderr, "listen on port %d\n", port); // Create epoll int epfd = epoll_create1(0); if (epfd < 0) { fprintf(stderr, "Error: epoll create\n"); close(listen_fd); return EXIT_FAILURE; } struct epoll_event listen_ev; memset(&listen_ev, 0, sizeof(listen_ev)); listen_ev.events = EPOLLIN; listen_ev.data.fd = listen_fd; if (epoll_ctl(epfd, EPOLL_CTL_ADD, listen_fd, &listen_ev) < 0) { fprintf(stderr, "Error: epoll ctl add listen\n"); close(listen_fd); return EXIT_FAILURE; } struct epoll_event evs[N_CLIENT]; while (1) { // Wait epoll listener int n_fds = epoll_wait(epfd, evs, N_CLIENT, -1); // Error epoll if (n_fds < 0) { fprintf(stderr, "Error: epoll wait\n"); close(listen_fd); return EXIT_FAILURE; } for (int i = 0; i < n_fds; i++) { if (evs[i].data.fd == listen_fd) { // Add epoll listener struct sockaddr_in client_addr; socklen_t len_client = sizeof(client_addr); int conn_fd; if ((conn_fd = accept(listen_fd, (struct sockaddr *)&client_addr, &len_client)) < 0) { fprintf(stderr, "Error: accept\n"); return EXIT_FAILURE; } printf("Accept socket %d (%s : %hu)\n", conn_fd, inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port)); struct epoll_event conn_ev; memset(&conn_ev, 0, sizeof(listen_ev)); conn_ev.events = EPOLLIN; conn_ev.data.fd = conn_fd; if (epoll_ctl(epfd, EPOLL_CTL_ADD, conn_fd, &conn_ev) < 0) { fprintf(stderr, "Error: epoll ctl add listen\n"); close(listen_fd); return EXIT_FAILURE; } } else if (evs[i].events & EPOLLIN) { // Read data from client int sock_fd = evs[i].data.fd; ssize_t n = read(sock_fd, buf, BUF_SIZE); if (n < 0) { fprintf(stderr, "Error: read from socket %d\n", sock_fd); close(sock_fd); } else if (n == 0) { // connection closed by client printf("Close socket %d\n", sock_fd); struct epoll_event sock_ev; memset(&sock_ev, 0, sizeof(listen_ev)); sock_ev.events = EPOLLIN; sock_ev.data.fd = sock_fd; if (epoll_ctl(epfd, EPOLL_CTL_DEL, sock_fd, &sock_ev) < 0) { fprintf(stderr, "Error: epoll ctl dell\n"); close(listen_fd); return EXIT_FAILURE; } close(sock_fd); } else { printf("Read %zu bytes from socket %d\n", n, sock_fd); write_n(sock_fd, buf, n); write_n(1, buf, n); } } } } close(listen_fd); return EXIT_SUCCESS; }
io_uring
https://kernel.dk/io_uring.pdf
linux kernel 5.1以降であれば使用可能な新しい非同期I/O APIです。submission queue (SQ)とcompletion queue (CQ)の二つのring bufferを操作することで処理をします。例えばソケットからメッセージを受け取るrecvmsg を実行するためには IORING_OP_RECVMSG
opcodeのsubmission queue entry (SQE)をSQに投入することで処理が非同期で実行されます。またそれらの処理の完了通知はCQからcompletion queue entry (CQE)を受け取ることで実現できます。
今回はliburing というio_uringのシンプルなインターフェイスを提供しているライブラリを使用して実装します。
#include <arpa/inet.h> #include <errno.h> #include <liburing.h> #include <signal.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <sys/socket.h> #include <unistd.h> #define BACKLOG_SIZE 5 #define BUF_SIZE 1024 #define N_CLIENT 256 #define N_ENTRY 2048 #define GID 1 int listen_fd; enum { ACCEPT, READ, WRITE, }; typedef struct UserData { __u32 fd; __u16 type; } UserData; void int_handle(int n) { close(listen_fd); exit(EXIT_SUCCESS); } // wirte n byte ssize_t write_n(int fd, char *ptr, size_t n) { ssize_t n_left = n, n_written; while (n_left > 0) { if ((n_written = write(fd, ptr, n_left)) <= 0) { return n_written; } n_left -= n_written; ptr += n_written; } return EXIT_SUCCESS; } int main(int argc, char **argv) { char buf[BUF_SIZE] = {0}; // Create listen socket if ((listen_fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { fprintf(stderr, "Error: socket\n"); return EXIT_FAILURE; } // TCP port number int port = 8080; // Initialize server socket address struct sockaddr_in server_addr, client_addr; socklen_t client_len = sizeof(client_addr); memset(&server_addr, 0, sizeof(server_addr)); server_addr.sin_family = AF_INET; server_addr.sin_addr.s_addr = INADDR_ANY; server_addr.sin_port = htons(port); // Bind socket to an address if (bind(listen_fd, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) { fprintf(stderr, "Error: bind\n"); return EXIT_FAILURE; } // Listen if (listen(listen_fd, BACKLOG_SIZE) < 0) { fprintf(stderr, "Error: listen\n"); return EXIT_FAILURE; } // Set INT signal handler signal(SIGINT, int_handle); fprintf(stderr, "listen on port %d\n", port); // Initialize io_uring struct io_uring ring; struct io_uring_sqe *sqe; struct io_uring_cqe *cqe; int init_ret = io_uring_queue_init(N_ENTRY, &ring, 0); if (init_ret < 0) { fprintf(stderr, "Error: init io_uring queue %d\n", init_ret); close(listen_fd); return EXIT_FAILURE; } // Setup first accept sqe = io_uring_get_sqe(&ring); io_uring_prep_accept(sqe, listen_fd, (struct sockaddr *)&client_addr, &client_len, 0); io_uring_sqe_set_flags(sqe, 0); UserData conn_info = { .fd = listen_fd, .type = ACCEPT, }; memcpy(&sqe->user_data, &conn_info, sizeof(conn_info)); while (1) { io_uring_submit(&ring); io_uring_wait_cqe(&ring, &cqe); struct UserData conn_info; memcpy(&conn_info, &cqe->user_data, sizeof(conn_info)); int type = conn_info.type; if (cqe->res == -ENOBUFS) { fprintf(stderr, "Error: no buffer %d\n", cqe->res); close(listen_fd); return EXIT_FAILURE; } else if (type == ACCEPT) { int conn_fd = cqe->res; printf("Accept socket %d \n", conn_fd); if (conn_fd >= 0) { // no error // Read from client sqe = io_uring_get_sqe(&ring); io_uring_prep_recv(sqe, conn_fd, buf, BUF_SIZE, 0); UserData read_info = { .fd = conn_fd, .type = READ, }; memcpy(&sqe->user_data, &read_info, sizeof(read_info)); } // Add new client sqe = io_uring_get_sqe(&ring); io_uring_prep_accept(sqe, listen_fd, (struct sockaddr *)&client_addr, &client_len, 0); io_uring_sqe_set_flags(sqe, 0); UserData conn_info = { .fd = listen_fd, .type = ACCEPT, }; memcpy(&sqe->user_data, &conn_info, sizeof(conn_info)); } else if (type == READ) { int n_byte = cqe->res; if (cqe->res <= 0) { // connection closed by client printf("Close socket %d\n", conn_info.fd); close(conn_info.fd); } else { // Add Write printf("Read %d bytes from socket %d\n", n_byte, conn_info.fd); sqe = io_uring_get_sqe(&ring); io_uring_prep_send(sqe, conn_info.fd, buf, n_byte, 0); write_n(1, buf, n_byte); // output stdout io_uring_sqe_set_flags(sqe, 0); UserData write_info = { .fd = conn_info.fd, .type = WRITE, }; memcpy(&sqe->user_data, &write_info, sizeof(write_info)); } } else if (type == WRITE) { // Add read sqe = io_uring_get_sqe(&ring); io_uring_prep_recv(sqe, conn_info.fd, buf, BUF_SIZE, 0); UserData read_info = { .fd = conn_info.fd, .type = READ, }; memcpy(&sqe->user_data, &read_info, sizeof(read_info)); } io_uring_cqe_seen(&ring, cqe); } close(listen_fd); return EXIT_SUCCESS; }
repository
実装したecho serverは、以下のrepositoryに置きました