From be76eb42524e97f240e7dfca8d3db63df1ca6ed9 Mon Sep 17 00:00:00 2001 From: liqiang Date: Thu, 14 Dec 2023 21:02:02 +0800 Subject: [PATCH] bugfix: fifo suspend and fd leak Signed-off-by: liqiang --- qtfs/qtfs/syscall.c | 1 - qtfs/qtfs_server/server_fifo.c | 148 ++++++++++++++++++++++----------- 2 files changed, 99 insertions(+), 50 deletions(-) diff --git a/qtfs/qtfs/syscall.c b/qtfs/qtfs/syscall.c index 61a0de0..e4ee26e 100644 --- a/qtfs/qtfs/syscall.c +++ b/qtfs/qtfs/syscall.c @@ -56,7 +56,6 @@ static inline int qtfs_fstype_judgment(char __user *dir) return 1; } path_put(&path); - qtfs_info("qtfs fstype judge <%s> is not qtfs.\n", path.dentry->d_iname); return 0; } diff --git a/qtfs/qtfs_server/server_fifo.c b/qtfs/qtfs_server/server_fifo.c index 0eaadea..88d3cde 100644 --- a/qtfs/qtfs_server/server_fifo.c +++ b/qtfs/qtfs_server/server_fifo.c @@ -21,7 +21,7 @@ #include #include -int log_switch = 0; +#define log_switch 0 #include "req.h" #include "log.h" #include "libsocket.h" @@ -73,11 +73,9 @@ struct fifo_event_t { /* 触发时的操作函数 */ int (*handler)(struct fifo_event_t *event); // 仅在open阻塞状态有效,open完成后应该置空 - union { - void *priv; - int len; // valid read or write len - int peerfd; // priv fd - }; + void *priv; + int len; // valid read or write len + int peerfd; // priv fd unsigned long seq_num; int block; // block fifo or nonblock }; @@ -177,6 +175,18 @@ void fifo_del_event(struct fifo_event_t *evt) return; } +int fifo_resume_event(struct fifo_event_t *evt, unsigned int events) +{ + struct epoll_event event; + event.data.ptr = (void *)evt; + event.events = events; + if (-1 == epoll_ctl(epollfd, EPOLL_CTL_ADD, evt->fd, &event)) { + log_err("epoll ctl add fd:%d event failed, errno:%d.", evt->fd, errno); + return -1; + } + return 0; +} + void fifo_suspend_event(struct fifo_event_t *evt) { struct epoll_event event; @@ -184,7 +194,6 @@ void fifo_suspend_event(struct fifo_event_t *evt) if (epoll_ctl(epollfd, EPOLL_CTL_DEL, evt->fd, &event) == -1) { log_err("suspend event fd:%d failed, errno:%d", evt->fd, errno); } - free(evt); return; } @@ -203,7 +212,7 @@ static int fifo_del_peer(int flag, struct fifo_event_t *me) memset(fifo_peer_evt, 0, sizeof(struct fifo_event_t *) * EPOLL_MAX_EVENT_NUMS); break; case FIFO_PEER_ADD: - fifo_peer_evt[fifo_peer_index] = me->peerevt; + fifo_peer_evt[fifo_peer_index++] = me->peerevt; break; case FIFO_PEER_POST: for (int i = 0; i < fifo_peer_index; i++) { @@ -260,13 +269,15 @@ static void fifo_proc_ack(struct fifo_event_t *evt, int type, int sockfd, char * int fifo_proc_unknown(struct fifo_event_t *evt) { - struct open_arg_t *oarg; + struct open_arg_t *oarg = (struct open_arg_t *)evt->priv; log_info("unknown read/write event fd:%d happend, open event not complete!", evt->fd); // 这不是预期的事件,直接删除此事件,且关联删除open线程 pthread_mutex_lock(&fifomutex); + // 如果priv为空表示open thread已退出 if (evt->priv) { - oarg = (struct open_arg_t *)evt->priv; + // 如果priv非空,则open thread还在阻塞状态,先杀死线程,然后释放资源,置空priv。 pthread_cancel(*oarg->t); + oarg = (struct open_arg_t *)evt->priv; free(oarg->t); free(oarg->req); free(oarg); @@ -323,7 +334,10 @@ int fifo_proc_readable(struct fifo_event_t *evt) log_info("readable event fd:%d peerfd:%d, errno:%d", evt->fd, evt->peerevt->fd, errno); free(msg); - evt->peerevt->peerevt = NULL; + // 挂起readable任务,恢复监听网络请求 + if (fifo_resume_event(evt->peerevt, EPOLLIN|EPOLLHUP) == -1) { + goto err_ack; + } // 读完立即删除本监听,如果继续读后面再添加进来 return FIFO_RET_SUSPEND; @@ -334,7 +348,8 @@ err_ack: errrsp.len = 0; fifo_proc_ack(evt, QTFS_REQ_READITER, evt->peerevt->fd, (char *)&errrsp, sizeof(errrsp)); } while (0); - evt->peerevt->peerevt = NULL; + if (fifo_resume_event(evt->peerevt, EPOLLIN|EPOLLHUP) == -1) + return FIFO_RET_DEL_BOTH; return error_ret; } @@ -380,7 +395,10 @@ int fifo_proc_writeable(struct fifo_event_t *evt) log_info("writeable event fd:%d peerfd:%d, writelen:%lu, errno:%d", evt->fd, evt->peerevt->fd, rsp.len, errno); free(msg); - evt->peerevt->peerevt = NULL; + // 挂起写fifo任务,重启监听网络请求任务 + if (fifo_resume_event(evt->peerevt, EPOLLIN) == -1) { + goto err_ack; + } return FIFO_RET_SUSPEND; err_ack: @@ -390,6 +408,8 @@ err_ack: errrsp.len = 0; fifo_proc_ack(evt, QTFS_REQ_WRITE, evt->peerevt->fd, (char *)&errrsp, sizeof(errrsp)); } while (0); + if (fifo_resume_event(evt->peerevt, EPOLLIN|EPOLLHUP) == -1) + return FIFO_RET_DEL_BOTH; return error_ret; } @@ -402,9 +422,10 @@ int fifo_proc_read_req(struct fifo_event_t *evt) ret = fifo_recv_with_timeout(evt->fd, (char *)&req, sizeof(req)); if (ret <= 0) { log_err("recv fifo read head failed, errno:%d.", errno); - // 主线程是串行的,peerevt如果是空,则没有readable监听,直接close peerfd即可 + // 链接提前被中断了,有可能还没有peerevt,就直接关掉fd if (evt->peerevt == NULL) { - close(evt->peerfd); + if (evt->peerfd != 0) + close(evt->peerfd); return FIFO_RET_DEL; } // 如果peerevt非空则要同时删除peer事件 @@ -422,16 +443,31 @@ int fifo_proc_read_req(struct fifo_event_t *evt) } // if fifo is block, dont block on main thread - struct fifo_event_t *newevt = fifo_add_event(evt->peerfd, evt, fifo_proc_readable, NULL, EPOLLIN); - if (newevt == NULL) { - log_err("add readable event failed, fd:%d socketfd:%d", evt->peerfd, evt->fd); - return FIFO_RET_ERR; + if (evt->peerevt == NULL) { + struct fifo_event_t *newevt = fifo_add_event(evt->peerfd, evt, fifo_proc_readable, NULL, EPOLLIN); + if (newevt == NULL) { + log_err("add readable event failed, fd:%d socketfd:%d", evt->peerfd, evt->fd); + goto early_ack; + } + evt->peerevt = newevt; + newevt->len = req.len; + newevt->seq_num = evt->seq_num; + } else { + evt->peerevt->seq_num = evt->seq_num; + evt->peerevt->len = req.len; + if (fifo_resume_event(evt->peerevt, EPOLLIN) == -1) + goto early_ack; } - evt->peerevt = newevt; - newevt->len = req.len; - newevt->seq_num = evt->seq_num; + return FIFO_RET_SUSPEND; - return FIFO_RET_OK; +early_ack: + do { + struct qtrsp_fifo_read rsp; + rsp.err = -EFAULT; + rsp.len = 0; + fifo_proc_ack(evt, QTFS_REQ_READITER, evt->fd, (char *)&rsp, sizeof(rsp)); + } while (0); + return FIFO_RET_DEL_BOTH; } // 写 @@ -442,9 +478,10 @@ int fifo_proc_write_req(struct fifo_event_t *evt) ret = fifo_recv_with_timeout(evt->fd, (char *)&req, sizeof(req)); if (ret <= 0) { log_err("recv fifo write head failed, errno:%d.", errno); - // 主线程是串行的,peerevt如果是空,则没有readable监听,直接close peerfd即可 + // 链接提前被中断了,有可能还没有peerevt,就直接关掉fd if (evt->peerevt == NULL) { - close(evt->peerfd); + if (evt->peerfd != 0) + close(evt->peerfd); return FIFO_RET_DEL; } // 如果peerevt非空则要同时删除peer事件 @@ -461,16 +498,34 @@ int fifo_proc_write_req(struct fifo_event_t *evt) return FIFO_RET_OK; } // if fifo is block, dont block on main thread - struct fifo_event_t *newevt = fifo_add_event(evt->peerfd, evt, fifo_proc_writeable, NULL, EPOLLOUT); - if (newevt == NULL) { - log_err("add writeable event failed, fd:%d socketfd:%d", evt->peerfd, evt->fd); - return FIFO_RET_ERR; + if (evt->peerevt == NULL) { + struct fifo_event_t *newevt = fifo_add_event(evt->peerfd, evt, fifo_proc_writeable, NULL, EPOLLOUT); + if (newevt == NULL) { + log_err("add writeable event failed, fd:%d socketfd:%d", evt->peerfd, evt->fd); + goto early_ack; + } + newevt->len = req.len; + newevt->seq_num = evt->seq_num; + evt->peerevt = newevt; + } else { + evt->peerevt->seq_num = evt->seq_num; + evt->peerevt->len = req.len; + if (fifo_resume_event(evt->peerevt, EPOLLOUT) == -1) { + goto early_ack; + } } - evt->peerevt = newevt; - newevt->len = req.len; - newevt->seq_num = evt->seq_num; - return FIFO_RET_OK; + // 此时tcp fd需要切换为writeable状态,不能同时用,将本任务挂起不再监听,writeable完事再切回来 + return FIFO_RET_SUSPEND; +early_ack: + do { + struct qtrsp_fifo_write rsp; + rsp.err = -EFAULT; + rsp.len = 0; + fifo_proc_ack(evt, QTFS_REQ_WRITE, evt->fd, (char *)&rsp, sizeof(rsp)); + } while (0); + return FIFO_RET_DEL_BOTH; + } // read/write/close req @@ -481,11 +536,6 @@ int fifo_proc_new_req(struct fifo_event_t *evt) ret = fifo_recv_with_timeout(evt->fd, (char *)&head, sizeof(struct qtreq)); if (ret <= 0) { log_err("recv qtreq head failed, errno:%d.", errno); - // 主线程是串行的,peerevt如果是空,则没有readable监听,直接close peerfd即可 - if (evt->peerevt == NULL) { - close(evt->peerfd); - return FIFO_RET_DEL; - } // 如果peerevt非空则要同时删除peer事件 return FIFO_RET_DEL_BOTH; } @@ -522,7 +572,8 @@ int fifo_proc_new_req(struct fifo_event_t *evt) void *fifo_open_thread(void *arg) { int fd; - struct open_arg_t *oarg = (struct open_arg_t *)arg; + struct fifo_event_t *evt = (struct fifo_event_t *)arg; + struct open_arg_t *oarg = (struct open_arg_t *)evt->priv; int rw; int err = 0; struct fifo_event_t *newevt; @@ -540,15 +591,13 @@ void *fifo_open_thread(void *arg) // 代理不应该主动,只监听挂断事件,在通信对端发来read/write消息才 // 改为监听可读/可写状态并进行实际读写。 pthread_mutex_lock(&fifomutex); - if (rw == FIFO_READ) { - oarg->main_evt->peerevt = NULL; - oarg->main_evt->peerfd = fd; - oarg->main_evt->handler = fifo_proc_new_req; - } else { - oarg->main_evt->peerevt = NULL; - oarg->main_evt->handler = fifo_proc_new_req; - oarg->main_evt->peerfd = fd; + if (evt->priv == NULL) { + log_err("fatal error, oarg is invalid."); + goto end; } + oarg->main_evt->peerevt = NULL; + oarg->main_evt->handler = fifo_proc_new_req; + oarg->main_evt->peerfd = fd; oarg->main_evt->block = fifo_block_flags(oarg->req->flags); rsp.fd = fd; @@ -565,6 +614,7 @@ end: free(oarg->t); free(oarg->req); free(oarg); + evt->priv = NULL; pthread_mutex_unlock(&fifomutex); return NULL; } @@ -624,11 +674,11 @@ int fifo_proc_open_req(struct fifo_event_t *evt) free(oarg); return FIFO_RET_ERR; } - pthread_create(t, &attr, fifo_open_thread, oarg); + evt->priv = oarg; oarg->t = t; + pthread_create(t, &attr, fifo_open_thread, evt); // 临时状态机,暂时不知道是读是写 - evt->priv = oarg; evt->handler = fifo_proc_unknown; log_info("Start new fifo open thread head:%u, len:%d", head.type, head.len); -- 2.37.1 (Apple Git-137.1)