本次主要介绍如何通过socket
和pop_event
实现客户端、服务端之间的收发echo。
和本次关联提交的MR为https://gitee.com/popzilla/pophttp/pulls/2。共分为如下内容:
- 将
pop_event
动态库、头文件集成到pop_http
工程中。目前是直接合入二进制库和头文件,后续需要通过下载集成; - 合入客户端的消息收发实现,在
src/client/main.c
中; - 合入服务端的消息收发实现,在
src/server/main.c
中;
下文对实现方式进行详细的说明。
1. Client
客户端主要流程如下:
- 连接服务端;
- 将客户端的连接描述符加入到事件监听中;
- 将标准输入的连接描述符加入到事件监听中;
- 接收到标准输入时,将接收的文本发送给服务端;
- 接收到服务端消息时,将接收的文本打印到屏幕上;
连接服务端
此处需要将IP地址和端口转换为网络序,另为了提升事件监听效率需要将描述符设置为非阻塞模式。
在进行connect
之时,因为是非阻塞模式,调用connect
函数之后可能不会立即返回成功,而处于连接中的状态,此时的错误码为EINPROGRESS
。
static int connect_server(uint32_t ip, uint16_t port)
{
int fd = socket(AF_INET, SOCK_STREAM, 0);
if (fd < 0) {
return -1;
}
set_fd_nonblock(fd);
struct sockaddr_in sin = { 0 };
sin.sin_family = AF_INET;
sin.sin_addr.s_addr = htonl(ip);
sin.sin_port = htons(port);
int ret = connect(fd, (struct sockaddr*)&sin, sizeof(sin));
if (ret < 0) {
if (errno != EINPROGRESS) {
return -1;
} else {
printf("Connect in progress.\n");
}
}
return fd;
}
将标准输入和socket连接加入到事件监听
为了便于描述符的管理,这里定义了结构体进行承载。
struct client {
int input_fd;
int conn_fd;
bool server_hup; /* 服务端断开标记 */
};
int stdin = 0; /* 标准输入 */
static struct client client = { 0 };
client.input_fd = stdin;
client.conn_fd = conn_fd;
uint32_t what = POP_EVENT_ET | POP_EVENT_IN | POP_EVENT_RDHUP;
pop_event_t *conn_evt = pop_event_create(conn_fd, &client);
pop_event_add_watch(conn_evt, what, client_proc);
pop_event_t *input_evt = pop_event_create(stdin, &client);
what = POP_EVENT_ET | POP_EVENT_IN;
pop_event_add_watch(input_evt, what, input_proc);
处理标准输入事件
当标准输入出现可读事件时即标识有用户输入,此时将标准输入的内容读出,然后将内容写入到socket
连接中即发送给了服务端。
static void input_proc(pop_event_t *event, uint32_t what, void *usrdata)
{
struct client *client = (struct client *)usrdata;
if (what & POP_EVENT_IN) {
input_read(client);
}
}
static void client_write(int fd, uint8_t *buf, size_t len)
{
write(fd, buf, len);
printf("client write: %s", buf);
}
static void input_read(struct client *client)
{
uint8_t buf[RECV_BUF_LEN];
int len = read(client->input_fd, buf, sizeof(buf) - 1);
client_write(client->conn_fd, buf, len);
}
处理客户端事件
客户端事件这里只处理可读和对端断开事件(为了简单此处客户端可写事件不进行处理)。
当客户端可读时,将数据读取出打印出来;
当客户端收到了HUP
事件,即对端断开了,此处不能立即销毁event
对象,需要设置server_hup
标记后使用pop_event_destroy
销毁。
static void client_proc(pop_event_t *event, uint32_t what, void *usrdata)
{
struct client *client = (struct client *)usrdata;
if (what & POP_EVENT_IN) {
client_read(client);
}
if (what & POP_EVENT_RDHUP) {
server_hup(client);
}
if (client->server_hup) {
pop_event_destroy(event);
}
}
static void client_read(struct client *client)
{
uint8_t buf[RECV_BUF_LEN];
int len = read(client->conn_fd, buf, sizeof(buf) - 1);
buf[len] = '\0';
printf("client recv: %s", buf);
}
static void server_hup(struct client *client)
{
printf("Server hang up.\n");
close(client->conn_fd);
client->conn_fd = -1;
client->server_hup = true;
}
2. Server
服务端处理流程如下:
- 监听服务端口;
- 将服务监听的描述符加入到事件监听中;
- 处理监听描述符的事件,当事件可读时即说明有客户端连接,此时接受连接并将客户端连接加入到事件监听中;
- 处理客户端连接可读事件,当可读时将数据读出并添加进行发送缓冲区,并设置监听可写事件;
- 当客户端可写事件到来时,将发送缓冲区的数据发送给客户端。
监听服务端口
此处需要注意设置描述符的REUSE
和NONBLOCK
。REUSE
的作用可见socket 端口复用 SO_REUSEPORT 与 SO_REUSEADDR;NONBLOCK
的作用为设置非阻塞,为了提升事件处理效率;
static int server_listen(uint32_t ip, uint16_t port)
{
int fd = socket(AF_INET, SOCK_STREAM, 0);
if (fd < 0) {
printf("Failed to create socket, errno %d.\n", errno);
return -1;
}
set_fd_reuse(fd);
set_fd_nonblock(fd);
struct sockaddr_in sin = { 0 };
sin.sin_family = AF_INET;
sin.sin_addr.s_addr = htonl(ip);
sin.sin_port = htons(port);
if (bind(fd, (struct sockaddr*)&sin, sizeof(sin)) < 0) {
printf("Failed to bind socket, errno %d.\n", errno);
close(fd);
return -1;
}
if (listen(fd, MAX_BACKLOG_NUM) < 0) {
printf("Failed to listen, errno %d\n", errno);
close(fd);
return -1;
}
return fd;
}
处理客户端连接
当客户端连接到来时,需要接收连接并将产生的连接描述符添加进事件监听中。
处理客户端连接可读事件
当客户端连接可读时,需要使用read
函数循环读取连接描述符直到返回-1
提示EAGIN
即代表缓冲区已读空。
当缓冲区读空时,因为我们这里实现的是服务端echo功能,所以需要将接收到的数据原模原样发送回客户端。所以需要将数据拷贝进发送缓冲区,此处不直接发送的原因为协议栈发送缓冲区可能一次承载不了这些数据,需要将数据加入到我们自己的发送缓冲区,然后监听客户端连接的可写事件。当连接可写时,再从发送缓冲区读出发送出去。
static void conn_recv_proc(struct sock_conn *conn)
{
int len;
conn->recv_len = 0;
do {
uint8_t recv_buf[RECV_BUF_LEN];
len = read(conn->fd, recv_buf, sizeof(recv_buf) - 1);
if (len == 0) {
printf("Recv error fd %d\n", conn->fd);
conn->destroy = true;
return;
} else if (len < 0) {
break;
}
/* 缓冲区满了先不收了,实际需要业务处理时判断 */
if (conn->recv_len + len >= sizeof(conn->recv_buf)) {
continue;
}
memcpy(conn->recv_buf + conn->recv_len, recv_buf, len);
conn->recv_len += len;
} while(len > 0);
conn->recv_buf[conn->recv_len] = '\0';
printf("recv: %s\n", conn->recv_buf);
memcpy(conn->send_buf, conn->recv_buf, sizeof(conn->recv_buf));
conn->send_len = conn->recv_len;
conn_trigger_send(conn);
}
处理客户端连接的可写事件
当客户端连接可写时,需要不断地从发送缓冲区读取数据,并写入客户端连接。由于协议栈发送缓冲区会被写满,所以当满了之后需要返回,等待下一次可写事件的到来。直到所有的数据都发送完毕后,再设置停止发送即从监听的事件列表中删除写事件。
static void conn_send_proc(struct sock_conn *conn)
{
int left = conn->send_len;
int len;
do {
uint8_t *send_pos = conn->send_buf + (conn->send_len - left);
len = write(conn->fd, send_pos, left);
if (len == 0) {
conn->destroy = true;
} else if (len < 0) {
break;
}
left -= len;
} while (left > 0);
if (left <= 0) {
conn_pause_send(conn);
}
}
static void conn_pause_send(struct sock_conn *conn)
{
uint32_t what = POP_EVENT_IN | POP_EVENT_RDHUP |
POP_EVENT_ET;
pop_event_mod_watch(conn->event, what, conn_proc);
printf("Conn pause send %d\n", conn->fd);
}