suzuzusu日記

(´・ω・`)

fork, select, poll, epoll, io_uringのecho server

fork, select, poll, epoll, io_uringなどを使用してそれぞれecho serverを実装したのでそれぞれの仕様などを忘備録としてまとめていきます。エラー処理とか雑な部分があると思いますがご容赦ください。

環境構築

macOS上での実行を前提にします。io_uringを使う場合にはlinux kernel 5.1以上でないと動かないので、multipassというubuntuVMを手軽に作成できるツールを使って実行環境を構築します。

multipassのインストール

brew install --cask multipass

20.10のubuntuを用意して、gccやio_uringのライブラリであるliburingを入れます。

multipass launch 20.10 -n primary
multipass shell # login
sudo apt update -y
sudo apt install gcc liburing -y
uname -a
> Linux primary 5.8.0-43-generic #49-Ubuntu SMP Fri Feb 5 03:01:28 UTC 2021 x86_64 x86_64 x86_64 GNU/Linux

fork

fork(2) - Linux manual page

forkを使用したサーバでは、1クライアントに1プロセスが対応することになり、メモリが枯渇して性能に余裕があるのにもかかわらずレスポンス性能が大きく落ちます。

有名なC10K問題を引き起こします。

C10K問題(英語: C10K problem)とは、Apache HTTP ServerなどのWebサーバソフトウェアとクライアントの通信において、クライアントが約1万台に達すると、Webサーバーのハードウェア性能に余裕があるにも関わらず、レスポンス性能が大きく下がる問題である。

C10K問題 - Wikipedia

#include <arpa/inet.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <unistd.h>

#define BACKLOG_SIZE 5
#define BUF_SIZE 1024
#define INF_TIME -1
#define DISABLE -1

int listen_fd;

void int_handle(int n) {
  close(listen_fd);
  exit(EXIT_SUCCESS);
}

// wirte n byte
ssize_t write_n(int fd, char *ptr, size_t n) {
  ssize_t n_left = n, n_written;

  while (n_left > 0) {
    if ((n_written = write(fd, ptr, n_left)) <= 0) {
      return n_written;
    }
    n_left -= n_written;
    ptr += n_written;
  }

  return EXIT_SUCCESS;
}

int main(int argc, char **argv) {
  // Create listen socket
  if ((listen_fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
    fprintf(stderr, "Error: socket\n");
    return EXIT_FAILURE;
  }

  // TCP port number
  int port = 8080;

  // Initialize server socket address
  struct sockaddr_in server_addr;
  memset(&server_addr, 0, sizeof(server_addr));
  server_addr.sin_family = AF_INET;
  server_addr.sin_addr.s_addr = INADDR_ANY;
  server_addr.sin_port = htons(port);

  // Bind socket to an address
  if (bind(listen_fd, (struct sockaddr *)&server_addr, sizeof(server_addr)) <
      0) {
    fprintf(stderr, "Error: bind\n");
    return EXIT_FAILURE;
  }

  // Listen
  if (listen(listen_fd, BACKLOG_SIZE) < 0) {
    fprintf(stderr, "Error: listen\n");
    return EXIT_FAILURE;
  }

  // Set INT signal handler
  signal(SIGINT, int_handle);

  fprintf(stderr, "listen on port %d\n", port);

  while (1) {
    // Check new connection
    struct sockaddr_in client_addr;
    socklen_t len_client = sizeof(client_addr);

    int conn_fd;
    if ((conn_fd = accept(listen_fd, (struct sockaddr *)&client_addr,
                          &len_client)) < 0) {
      fprintf(stderr, "Error: accept\n");
      return EXIT_FAILURE;
    }

    printf("Accept socket %d (%s : %hu)\n", conn_fd,
           inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port));

    pid_t pid = fork();

    if (pid < 0) {
      fprintf(stderr, "Error: fork\n");
      return EXIT_FAILURE;
    }

    if (pid == 0) {
      // child
      char buf[BUF_SIZE];
      close(listen_fd);
      while (1) {
        ssize_t n = read(conn_fd, buf, BUF_SIZE);

        if (n < 0) {
          fprintf(stderr, "Error: read from socket %d\n", conn_fd);
          close(conn_fd);
          exit(-1);
        } else if (n == 0) {  // connection closed by client
          printf("Close socket %d\n", conn_fd);
          close(conn_fd);
          exit(0);
        } else {
          printf("Read %zu bytes from socket %d\n", n, conn_fd);
          write_n(conn_fd, buf, n);
        }
      }
    } else {
      // parent
      close(conn_fd);
    }
  }

  close(listen_fd);
  return EXIT_SUCCESS;
}

I/O多重化

上記のようなマルチプロセスにより起きるC10K問題を解決する方法として、クライアント数にかかわらず1プロセスで処理するイベント駆動型プログラミングが提案されています。

イベント駆動型プログラミング(イベントくどうがたプログラミング、英: event-driven programming)は、コンピュータプログラムが起動すると共にイベントを待機し、発生したイベントに従って受動的に処理を行うプログラミングパラダイムのこと。

イベント駆動型プログラミング - Wikipedia

以降はI/O多重化によるecho serverを実装していきます。I/O多重化とは複数のI/Oデバイスの状態を同時に監視する仕組みです。例えば1プロセスで複数のクライアントを制御するためにこの技術は使用されます。

select

select(2) - Linux manual page

ディスクリプタを線形に探索する必要があるので計算量がO(n)かかります。管理できるディスクリプタ数に上限があるのが特徴です。

#include <arpa/inet.h>
#include <errno.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/select.h>
#include <sys/socket.h>
#include <unistd.h>

#define BACKLOG_SIZE 5
#define BUF_SIZE 1024
#define N_CLIENT 256
#define INF_TIME -1
#define DISABLE -1

int listen_fd;

void int_handle(int n) {
  close(listen_fd);
  exit(EXIT_SUCCESS);
}

// wirte n byte
ssize_t write_n(int fd, char *ptr, size_t n) {
  ssize_t n_left = n, n_written;

  while (n_left > 0) {
    if ((n_written = write(fd, ptr, n_left)) <= 0) {
      return n_written;
    }
    n_left -= n_written;
    ptr += n_written;
  }

  return EXIT_SUCCESS;
}

int main(int argc, char **argv) {
  char buf[BUF_SIZE];
  fd_set fds;
  FD_ZERO(&fds);
  int clients[N_CLIENT];
  for (int i = 0; i < N_CLIENT; i++) {
    clients[i] = DISABLE;
  }
  memset(&fds, 0, sizeof(fds));

  // Create listen socket
  if ((listen_fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
    fprintf(stderr, "Error: socket\n");
    return EXIT_FAILURE;
  }

  // Set INT signal handler
  signal(SIGINT, int_handle);

  // TCP port number
  int port = 8080;

  // Initialize server socket address
  struct sockaddr_in server_addr;
  memset(&server_addr, 0, sizeof(server_addr));
  server_addr.sin_family = AF_INET;
  server_addr.sin_addr.s_addr = INADDR_ANY;
  server_addr.sin_port = htons(port);

  // Bind socket to an address
  if (bind(listen_fd, (struct sockaddr *)&server_addr, sizeof(server_addr)) <
      0) {
    fprintf(stderr, "Error: bind\n");
    return EXIT_FAILURE;
  }

  // Listen
  if (listen(listen_fd, BACKLOG_SIZE) < 0) {
    fprintf(stderr, "Error: listen\n");
    return EXIT_FAILURE;
  }

  fprintf(stderr, "listen on port %d\n", port);

  FD_SET(listen_fd, &fds);

  int max_fd = listen_fd;  // max fd
  int max_i = 0;           // max client into clients[] array

  while (1) {
    FD_ZERO(&fds);
    FD_SET(listen_fd, &fds);
    for (int i = 0; i < N_CLIENT; i++) {
      if (clients[i] != DISABLE) {
        FD_SET(clients[i], &fds);
      }
    }

    int res_select = select(max_fd + 1, &fds, NULL, NULL, NULL);

    if (res_select < 0) {
      fprintf(stderr, "Error: select");
      return EXIT_FAILURE;
    }

    // Check new connection
    if (FD_ISSET(listen_fd, &fds)) {
      struct sockaddr_in client_addr;
      socklen_t len_client = sizeof(client_addr);

      int connfd;
      if ((connfd = accept(listen_fd, (struct sockaddr *)&client_addr,
                           &len_client)) < 0) {
        fprintf(stderr, "Error: accept\n");
        return EXIT_FAILURE;
      }

      printf("Accept socket %d (%s : %hu)\n", connfd,
             inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port));

      // Save client socket into clients array
      int i;
      for (i = 0; i < N_CLIENT; i++) {
        if (clients[i] == DISABLE) {
          clients[i] = connfd;
          break;
        }
      }

      // No enough space in clients array
      if (i == N_CLIENT) {
        fprintf(stderr, "Error: too many clients\n");
        close(connfd);
      }

      if (i > max_i) {
        max_i = i;
      }
      if (connfd > max_fd) {
        max_fd = connfd;
      }
    }

    // Check all clients to read data
    for (int i = 0; i <= max_i; i++) {
      int sock_fd;
      if ((sock_fd = clients[i]) == DISABLE) {
        continue;
      }

      // If the client is readable or errors occur
      ssize_t n = read(sock_fd, buf, BUF_SIZE);
      if (n < 0) {
        fprintf(stderr, "Error: read from socket %d\n", sock_fd);
        close(sock_fd);
        clients[i] = DISABLE;
      } else if (n == 0) {  // connection closed by client
        printf("Close socket %d\n", sock_fd);
        close(sock_fd);
        clients[i] = DISABLE;
      } else {
        printf("Read %zu bytes from socket %d\n", n, sock_fd);
        write_n(sock_fd, buf, n);
        write_n(1, buf, n);
      }
    }
  }

  close(listen_fd);
  return EXIT_SUCCESS;
}

poll

poll(2) - Linux manual page

selectとほとんど同じ機能であり、計算量も同じO(n)だけかかります。ただし、管理するディスクリプタの制限がない点で上位互換と捉えることができます。

#include <arpa/inet.h>
#include <errno.h>
#include <poll.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <unistd.h>

#define BACKLOG_SIZE 5
#define BUF_SIZE 1024
#define N_CLIENT 256
#define INF_TIME -1
#define DISABLE -1

int listen_fd;

void int_handle(int n) {
  close(listen_fd);
  exit(EXIT_SUCCESS);
}

// wirte n byte
ssize_t write_n(int fd, char *ptr, size_t n) {
  ssize_t n_left = n, n_written;

  while (n_left > 0) {
    if ((n_written = write(fd, ptr, n_left)) <= 0) {
      return n_written;
    }
    n_left -= n_written;
    ptr += n_written;
  }

  return EXIT_SUCCESS;
}

int main(int argc, char **argv) {
  char buf[BUF_SIZE];
  struct pollfd clients[N_CLIENT];

  // Create listen socket
  if ((listen_fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
    fprintf(stderr, "Error: socket\n");
    return EXIT_FAILURE;
  }

  // TCP port number
  int port = 8080;

  // Initialize server socket address
  struct sockaddr_in server_addr;
  memset(&server_addr, 0, sizeof(server_addr));
  server_addr.sin_family = AF_INET;
  server_addr.sin_addr.s_addr = INADDR_ANY;
  server_addr.sin_port = htons(port);

  // Bind socket to an address
  if (bind(listen_fd, (struct sockaddr *)&server_addr, sizeof(server_addr)) <
      0) {
    fprintf(stderr, "Error: bind\n");
    return EXIT_FAILURE;
  }

  // Listen
  if (listen(listen_fd, BACKLOG_SIZE) < 0) {
    fprintf(stderr, "Error: listen\n");
    return EXIT_FAILURE;
  }

  // Set INT signal handler
  signal(SIGINT, int_handle);

  fprintf(stderr, "listen on port %d\n", port);

  clients[0].fd = listen_fd;
  clients[0].events = POLLIN;

  for (int i = 1; i < N_CLIENT; i++) {
    clients[i].fd = DISABLE;
  }
  int max_i = 0;  // max index into clients[] array

  while (1) {
    int n_ready = poll(clients, max_i + 1, INF_TIME);

    // Time out
    if (n_ready == 0) {
      continue;
    }

    // Error poll
    if (n_ready < 0) {
      fprintf(stderr, "Error: poll %d\n", errno);
      return errno;
    }

    // Check new connection
    if (clients[0].revents & POLLIN) {
      struct sockaddr_in client_addr;
      socklen_t len_client = sizeof(client_addr);

      int connfd;
      if ((connfd = accept(listen_fd, (struct sockaddr *)&client_addr,
                           &len_client)) < 0) {
        fprintf(stderr, "Error: accept\n");
        return EXIT_FAILURE;
      }

      printf("Accept socket %d (%s : %hu)\n", connfd,
             inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port));

      // Save client socket into clients array
      int i;
      for (i = 0; i < N_CLIENT; i++) {
        if (clients[i].fd == DISABLE) {
          clients[i].fd = connfd;
          break;
        }
      }

      // No enough space in clients array
      if (i == N_CLIENT) {
        fprintf(stderr, "Error: too many clients\n");
        close(connfd);
      }

      clients[i].events = POLLIN;

      if (i > max_i) {
        max_i = i;
      }
    }

    // Check all clients to read data
    for (int i = 1; i <= max_i; i++) {
      int sock_fd;
      if ((sock_fd = clients[i].fd) == DISABLE) {
        continue;
      }

      // If the client is readable or errors occur
      if (clients[i].revents & (POLLIN | POLLERR)) {
        ssize_t n = read(sock_fd, buf, BUF_SIZE);

        if (n < 0) {
          fprintf(stderr, "Error: read from socket %d\n", sock_fd);
          close(sock_fd);
          clients[i].fd = DISABLE;
        } else if (n == 0) {  // connection closed by client
          printf("Close socket %d\n", sock_fd);
          close(sock_fd);
          clients[i].fd = DISABLE;
        } else {
          printf("Read %zu bytes from socket %d\n", n, sock_fd);
          write_n(sock_fd, buf, n);
          write_n(1, buf, n);
        }
      }
    }
  }

  close(listen_fd);
  return EXIT_SUCCESS;
}

epoll

epoll(7) - Linux manual page

linux kernel 2.6以降であれば使用可能なAPIです。

ディスクリプタの状態をカーネル空間で監視しているため、直接変更のあるディスクリプタを返してくれるので計算量がO(1)となり高速に処理できます。

ただし、ディスクリプタが1の場合はepollにオーバーヘッドが存在して、pollの方が早い場合もあるので注意が必要です。

stackoverflow.com

https://monkey.org/~provos/libevent/libevent-benchmark2.jpg

#include <arpa/inet.h>
#include <errno.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <unistd.h>

#define BACKLOG_SIZE 5
#define BUF_SIZE 1024
#define N_CLIENT 256
#define INF_TIME -1
#define DISABLE -1

int listen_fd;

void int_handle(int n) {
  close(listen_fd);
  exit(EXIT_SUCCESS);
}

// wirte n byte
ssize_t write_n(int fd, char *ptr, size_t n) {
  ssize_t n_left = n, n_written;

  while (n_left > 0) {
    if ((n_written = write(fd, ptr, n_left)) <= 0) {
      return n_written;
    }
    n_left -= n_written;
    ptr += n_written;
  }

  return EXIT_SUCCESS;
}

int main(int argc, char **argv) {
  char buf[BUF_SIZE];

  // Create listen socket
  if ((listen_fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
    fprintf(stderr, "Error: socket\n");
    return EXIT_FAILURE;
  }

  // TCP port number
  int port = 8080;

  // Initialize server socket address
  struct sockaddr_in server_addr;
  memset(&server_addr, 0, sizeof(server_addr));
  server_addr.sin_family = AF_INET;
  server_addr.sin_addr.s_addr = INADDR_ANY;
  server_addr.sin_port = htons(port);

  // Bind socket to an address
  if (bind(listen_fd, (struct sockaddr *)&server_addr, sizeof(server_addr)) <
      0) {
    fprintf(stderr, "Error: bind\n");
    return EXIT_FAILURE;
  }

  // Listen
  if (listen(listen_fd, BACKLOG_SIZE) < 0) {
    fprintf(stderr, "Error: listen\n");
    return EXIT_FAILURE;
  }

  // Set INT signal handler
  signal(SIGINT, int_handle);

  fprintf(stderr, "listen on port %d\n", port);

  // Create epoll
  int epfd = epoll_create1(0);
  if (epfd < 0) {
    fprintf(stderr, "Error: epoll create\n");
    close(listen_fd);
    return EXIT_FAILURE;
  }

  struct epoll_event listen_ev;
  memset(&listen_ev, 0, sizeof(listen_ev));
  listen_ev.events = EPOLLIN;
  listen_ev.data.fd = listen_fd;
  if (epoll_ctl(epfd, EPOLL_CTL_ADD, listen_fd, &listen_ev) < 0) {
    fprintf(stderr, "Error: epoll ctl add listen\n");
    close(listen_fd);
    return EXIT_FAILURE;
  }

  struct epoll_event evs[N_CLIENT];
  while (1) {
    // Wait epoll listener
    int n_fds = epoll_wait(epfd, evs, N_CLIENT, -1);

    // Error epoll
    if (n_fds < 0) {
      fprintf(stderr, "Error: epoll wait\n");
      close(listen_fd);
      return EXIT_FAILURE;
    }

    for (int i = 0; i < n_fds; i++) {
      if (evs[i].data.fd == listen_fd) {  // Add epoll listener
        struct sockaddr_in client_addr;
        socklen_t len_client = sizeof(client_addr);

        int conn_fd;
        if ((conn_fd = accept(listen_fd, (struct sockaddr *)&client_addr,
                              &len_client)) < 0) {
          fprintf(stderr, "Error: accept\n");
          return EXIT_FAILURE;
        }

        printf("Accept socket %d (%s : %hu)\n", conn_fd,
               inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port));

        struct epoll_event conn_ev;
        memset(&conn_ev, 0, sizeof(listen_ev));
        conn_ev.events = EPOLLIN;
        conn_ev.data.fd = conn_fd;
        if (epoll_ctl(epfd, EPOLL_CTL_ADD, conn_fd, &conn_ev) < 0) {
          fprintf(stderr, "Error: epoll ctl add listen\n");
          close(listen_fd);
          return EXIT_FAILURE;
        }
      } else if (evs[i].events & EPOLLIN) {  // Read data from client
        int sock_fd = evs[i].data.fd;
        ssize_t n = read(sock_fd, buf, BUF_SIZE);

        if (n < 0) {
          fprintf(stderr, "Error: read from socket %d\n", sock_fd);
          close(sock_fd);
        } else if (n == 0) {  // connection closed by client
          printf("Close socket %d\n", sock_fd);
          struct epoll_event sock_ev;
          memset(&sock_ev, 0, sizeof(listen_ev));
          sock_ev.events = EPOLLIN;
          sock_ev.data.fd = sock_fd;
          if (epoll_ctl(epfd, EPOLL_CTL_DEL, sock_fd, &sock_ev) < 0) {
            fprintf(stderr, "Error: epoll ctl dell\n");
            close(listen_fd);
            return EXIT_FAILURE;
          }
          close(sock_fd);
        } else {
          printf("Read %zu bytes from socket %d\n", n, sock_fd);
          write_n(sock_fd, buf, n);
          write_n(1, buf, n);
        }
      }
    }
  }

  close(listen_fd);
  return EXIT_SUCCESS;
}

io_uring

https://kernel.dk/io_uring.pdf

linux kernel 5.1以降であれば使用可能な新しい非同期I/O APIです。submission queue (SQ)とcompletion queue (CQ)の二つのring bufferを操作することで処理をします。例えばソケットからメッセージを受け取るrecvmsg を実行するためには IORING_OP_RECVMSG opcodeのsubmission queue entry (SQE)をSQに投入することで処理が非同期で実行されます。またそれらの処理の完了通知はCQからcompletion queue entry (CQE)を受け取ることで実現できます。

今回はliburing というio_uringのシンプルなインターフェイスを提供しているライブラリを使用して実装します。

#include <arpa/inet.h>
#include <errno.h>
#include <liburing.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <unistd.h>

#define BACKLOG_SIZE 5
#define BUF_SIZE 1024
#define N_CLIENT 256
#define N_ENTRY 2048
#define GID 1

int listen_fd;

enum {
  ACCEPT,
  READ,
  WRITE,
};

typedef struct UserData {
  __u32 fd;
  __u16 type;
} UserData;

void int_handle(int n) {
  close(listen_fd);
  exit(EXIT_SUCCESS);
}

// wirte n byte
ssize_t write_n(int fd, char *ptr, size_t n) {
  ssize_t n_left = n, n_written;

  while (n_left > 0) {
    if ((n_written = write(fd, ptr, n_left)) <= 0) {
      return n_written;
    }
    n_left -= n_written;
    ptr += n_written;
  }

  return EXIT_SUCCESS;
}

int main(int argc, char **argv) {
  char buf[BUF_SIZE] = {0};

  // Create listen socket
  if ((listen_fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
    fprintf(stderr, "Error: socket\n");
    return EXIT_FAILURE;
  }

  // TCP port number
  int port = 8080;

  // Initialize server socket address
  struct sockaddr_in server_addr, client_addr;
  socklen_t client_len = sizeof(client_addr);
  memset(&server_addr, 0, sizeof(server_addr));
  server_addr.sin_family = AF_INET;
  server_addr.sin_addr.s_addr = INADDR_ANY;
  server_addr.sin_port = htons(port);

  // Bind socket to an address
  if (bind(listen_fd, (struct sockaddr *)&server_addr, sizeof(server_addr)) <
      0) {
    fprintf(stderr, "Error: bind\n");
    return EXIT_FAILURE;
  }

  // Listen
  if (listen(listen_fd, BACKLOG_SIZE) < 0) {
    fprintf(stderr, "Error: listen\n");
    return EXIT_FAILURE;
  }

  // Set INT signal handler
  signal(SIGINT, int_handle);

  fprintf(stderr, "listen on port %d\n", port);

  // Initialize io_uring
  struct io_uring ring;
  struct io_uring_sqe *sqe;
  struct io_uring_cqe *cqe;
  int init_ret = io_uring_queue_init(N_ENTRY, &ring, 0);
  if (init_ret < 0) {
    fprintf(stderr, "Error: init io_uring queue %d\n", init_ret);
    close(listen_fd);
    return EXIT_FAILURE;
  }

  // Setup first accept
  sqe = io_uring_get_sqe(&ring);
  io_uring_prep_accept(sqe, listen_fd, (struct sockaddr *)&client_addr,
                       &client_len, 0);
  io_uring_sqe_set_flags(sqe, 0);
  UserData conn_info = {
      .fd = listen_fd,
      .type = ACCEPT,
  };
  memcpy(&sqe->user_data, &conn_info, sizeof(conn_info));

  while (1) {
    io_uring_submit(&ring);
    io_uring_wait_cqe(&ring, &cqe);
    struct UserData conn_info;
    memcpy(&conn_info, &cqe->user_data, sizeof(conn_info));
    int type = conn_info.type;

    if (cqe->res == -ENOBUFS) {
      fprintf(stderr, "Error: no buffer %d\n", cqe->res);
      close(listen_fd);
      return EXIT_FAILURE;
    } else if (type == ACCEPT) {
      int conn_fd = cqe->res;
      printf("Accept socket %d \n", conn_fd);
      if (conn_fd >= 0) {  // no error
        // Read from client
        sqe = io_uring_get_sqe(&ring);
        io_uring_prep_recv(sqe, conn_fd, buf, BUF_SIZE, 0);

        UserData read_info = {
            .fd = conn_fd,
            .type = READ,
        };
        memcpy(&sqe->user_data, &read_info, sizeof(read_info));
      }

      // Add new client
      sqe = io_uring_get_sqe(&ring);
      io_uring_prep_accept(sqe, listen_fd, (struct sockaddr *)&client_addr,
                           &client_len, 0);
      io_uring_sqe_set_flags(sqe, 0);
      UserData conn_info = {
          .fd = listen_fd,
          .type = ACCEPT,
      };
      memcpy(&sqe->user_data, &conn_info, sizeof(conn_info));
    } else if (type == READ) {
      int n_byte = cqe->res;
      if (cqe->res <= 0) {  // connection closed by client
        printf("Close socket %d\n", conn_info.fd);
        close(conn_info.fd);
      } else {
        // Add Write
        printf("Read %d bytes from socket %d\n", n_byte, conn_info.fd);
        sqe = io_uring_get_sqe(&ring);
        io_uring_prep_send(sqe, conn_info.fd, buf, n_byte, 0);
        write_n(1, buf, n_byte);  // output stdout
        io_uring_sqe_set_flags(sqe, 0);
        UserData write_info = {
            .fd = conn_info.fd,
            .type = WRITE,
        };
        memcpy(&sqe->user_data, &write_info, sizeof(write_info));
      }
    } else if (type == WRITE) {
      // Add read
      sqe = io_uring_get_sqe(&ring);
      io_uring_prep_recv(sqe, conn_info.fd, buf, BUF_SIZE, 0);

      UserData read_info = {
          .fd = conn_info.fd,
          .type = READ,
      };
      memcpy(&sqe->user_data, &read_info, sizeof(read_info));
    }

    io_uring_cqe_seen(&ring, cqe);
  }

  close(listen_fd);
  return EXIT_SUCCESS;
}

repository

実装したecho serverは、以下のrepositoryに置きました

github.com

参考