Projects
openEuler:22.03:LTS
gazelle
_service:tar_scm_kernel_repo:0135-rpc-function-...
Sign Up
Log In
Username
Password
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File _service:tar_scm_kernel_repo:0135-rpc-function-does-not-depend-on-protocol-stack.patch of Package gazelle
From fe39b43f897be7d29f9b51e79d51395e43b83e23 Mon Sep 17 00:00:00 2001 From: jiangheng <jiangheng14@huawei.com> Date: Sun, 4 Feb 2024 19:46:17 +0800 Subject: [PATCH] rpc function does not depend on protocol stack diff rpc queue and dfx rpc queue --- src/common/gazelle_dfx_msg.h | 1 - src/lstack/api/lstack_rtw_api.c | 36 ++- src/lstack/core/lstack_control_plane.c | 10 +- src/lstack/core/lstack_dpdk.c | 4 +- src/lstack/core/lstack_lwip.c | 2 +- src/lstack/core/lstack_protocol_stack.c | 92 ++++---- src/lstack/core/lstack_stack_stat.c | 18 +- src/lstack/core/lstack_thread_rpc.c | 241 ++++++++------------- src/lstack/include/lstack_control_plane.h | 3 - src/lstack/include/lstack_protocol_stack.h | 36 +-- src/lstack/include/lstack_rpc_proc.h | 46 ++++ src/lstack/include/lstack_thread_rpc.h | 79 +++---- src/lstack/netif/lstack_ethdev.c | 6 +- src/ltran/ltran_dfx.c | 3 +- 14 files changed, 292 insertions(+), 285 deletions(-) create mode 100644 src/lstack/include/lstack_rpc_proc.h diff --git a/src/common/gazelle_dfx_msg.h b/src/common/gazelle_dfx_msg.h index 1ca210b..a91a30f 100644 --- a/src/common/gazelle_dfx_msg.h +++ b/src/common/gazelle_dfx_msg.h @@ -101,7 +101,6 @@ struct gazelle_stack_aggregate_stats { struct gazelle_stat_pkts { uint16_t conn_num; uint32_t mbufpool_avail_cnt; - uint32_t rpcpool_avail_cnt; uint64_t call_msg_cnt; uint64_t recv_list_cnt; uint64_t call_alloc_fail; diff --git a/src/lstack/api/lstack_rtw_api.c b/src/lstack/api/lstack_rtw_api.c index 10bc613..8498b8e 100644 --- a/src/lstack/api/lstack_rtw_api.c +++ b/src/lstack/api/lstack_rtw_api.c @@ -28,7 +28,11 @@ int rtw_socket(int domain, int type, int protocol) { - return rpc_call_socket(domain, type, protocol); + struct protocol_stack *stack = get_bind_protocol_stack(); + if (stack == NULL) { + GAZELLE_RETURN(EINVAL); + } + return rpc_call_socket(&stack->rpc_queue, domain, type, protocol); } int rtw_accept(int s, struct sockaddr *addr, socklen_t *addrlen) @@ -64,27 +68,47 @@ int rtw_listen(int s, int backlog) int rtw_connect(int s, const struct sockaddr *name, socklen_t namelen) { - return rpc_call_connect(s, name, namelen); + struct protocol_stack *stack = get_protocol_stack_by_fd(s); + if (stack == NULL) { + GAZELLE_RETURN(EBADF); + } + return rpc_call_connect(&stack->rpc_queue, s, name, namelen); } int rtw_setsockopt(int s, int level, int optname, const void *optval, socklen_t optlen) { - return rpc_call_setsockopt(s, level, optname, optval, optlen); + struct protocol_stack *stack = get_protocol_stack_by_fd(s); + if (stack == NULL) { + GAZELLE_RETURN(EBADF); + } + return rpc_call_setsockopt(&stack->rpc_queue, s, level, optname, optval, optlen); } int rtw_getsockopt(int s, int level, int optname, void *optval, socklen_t *optlen) { - return rpc_call_getsockopt(s, level, optname, optval, optlen); + struct protocol_stack *stack = get_protocol_stack_by_fd(s); + if (stack == NULL) { + GAZELLE_RETURN(EBADF); + } + return rpc_call_getsockopt(&stack->rpc_queue, s, level, optname, optval, optlen); } int rtw_getpeername(int s, struct sockaddr *name, socklen_t *namelen) { - return rpc_call_getpeername(s, name, namelen); + struct protocol_stack *stack = get_protocol_stack_by_fd(s); + if (stack == NULL) { + GAZELLE_RETURN(EBADF); + } + return rpc_call_getpeername(&stack->rpc_queue, s, name, namelen); } int rtw_getsockname(int s, struct sockaddr *name, socklen_t *namelen) { - return rpc_call_getsockname(s, name, namelen); + struct protocol_stack *stack = get_protocol_stack_by_fd(s); + if (stack == NULL) { + GAZELLE_RETURN(EBADF); + } + return rpc_call_getsockname(&stack->rpc_queue, s, name, namelen); } ssize_t rtw_read(int s, void *mem, size_t len) diff --git a/src/lstack/core/lstack_control_plane.c b/src/lstack/core/lstack_control_plane.c index a9a3814..025291d 100644 --- a/src/lstack/core/lstack_control_plane.c +++ b/src/lstack/core/lstack_control_plane.c @@ -611,9 +611,10 @@ static int32_t thread_register(void) /* register all connected conn before listen conn, avoid creating new conn */ struct protocol_stack_group *stack_group = get_protocol_stack_group(); for (int32_t i = 0; i < stack_group->stack_num; i++) { - conn->conn_num = rpc_call_conntable(stack_group->stacks[i], conn->conn_list, GAZELLE_LSTACK_MAX_CONN); + conn->conn_num = rpc_call_conntable(&stack_group->stacks[i]->rpc_queue, + conn->conn_list, GAZELLE_LSTACK_MAX_CONN); - ret = rpc_call_thread_regphase1(stack_group->stacks[i], conn); + ret = rpc_call_thread_regphase1(&stack_group->stacks[i]->rpc_queue, conn); if (ret != 0) { LSTACK_LOG(ERR, LSTACK, "thread_register_phase1 failed ret=%d!\n", ret); free(conn); @@ -622,9 +623,10 @@ static int32_t thread_register(void) } for (int32_t i = 0; i < stack_group->stack_num; i++) { - conn->conn_num = rpc_call_conntable(stack_group->stacks[i], conn->conn_list, GAZELLE_LSTACK_MAX_CONN); + conn->conn_num = rpc_call_conntable(&stack_group->stacks[i]->rpc_queue, + conn->conn_list, GAZELLE_LSTACK_MAX_CONN); - ret = rpc_call_thread_regphase2(stack_group->stacks[i], conn); + ret = rpc_call_thread_regphase2(&stack_group->stacks[i]->rpc_queue, conn); if (ret != 0) { LSTACK_LOG(ERR, LSTACK, "thread_register_phase2 failed ret=%d!\n", ret); free(conn); diff --git a/src/lstack/core/lstack_dpdk.c b/src/lstack/core/lstack_dpdk.c index e352850..985f1a5 100644 --- a/src/lstack/core/lstack_dpdk.c +++ b/src/lstack/core/lstack_dpdk.c @@ -45,7 +45,6 @@ #include "lstack_log.h" #include "dpdk_common.h" -#include "lstack_lockless_queue.h" #include "lstack_protocol_stack.h" #include "lstack_thread_rpc.h" #include "lstack_lwip.h" @@ -258,7 +257,8 @@ struct rte_mempool *create_mempool(const char *name, uint32_t count, uint32_t si int32_t create_shared_ring(struct protocol_stack *stack) { - lockless_queue_init(&stack->rpc_queue); + rpc_queue_init(&stack->rpc_queue); + rpc_queue_init(&stack->dfx_rpc_queue); if (use_ltran()) { stack->rx_ring = gazelle_ring_create_fast("RING_RX", VDEV_RX_QUEUE_SZ, RING_F_SP_ENQ | RING_F_SC_DEQ); diff --git a/src/lstack/core/lstack_lwip.c b/src/lstack/core/lstack_lwip.c index 3f76424..b79cdef 100644 --- a/src/lstack/core/lstack_lwip.c +++ b/src/lstack/core/lstack_lwip.c @@ -636,7 +636,7 @@ static inline void notice_stack_send(struct lwip_sock *sock, int32_t fd, int32_t { // 2: call_num >= 2, don't need add new rpc send if (__atomic_load_n(&sock->call_num, __ATOMIC_ACQUIRE) < 2) { - while (rpc_call_send(fd, NULL, len, flags) < 0) { + while (rpc_call_send(&sock->stack->rpc_queue, fd, NULL, len, flags) < 0) { usleep(1000); // 1000: wait 1ms to exec again } __sync_fetch_and_add(&sock->call_num, 1); diff --git a/src/lstack/core/lstack_protocol_stack.c b/src/lstack/core/lstack_protocol_stack.c index 8b99e82..f63fcb0 100644 --- a/src/lstack/core/lstack_protocol_stack.c +++ b/src/lstack/core/lstack_protocol_stack.c @@ -465,7 +465,10 @@ int stack_polling(uint32_t wakeup_tick) uint32_t read_connect_number = cfg->read_connect_number; struct protocol_stack *stack = get_protocol_stack(); - force_quit = poll_rpc_msg(stack, rpc_number); + /* 2: one dfx consumes two rpc */ + rpc_poll_msg(&stack->dfx_rpc_queue, 2); + force_quit = rpc_poll_msg(&stack->rpc_queue, rpc_number); + gazelle_eth_dev_poll(stack, use_ltran_flag, nic_read_number); sys_timer_run(); if (cfg->low_power_mod != 0) { @@ -715,7 +718,7 @@ OUT2: void stack_arp(struct rpc_msg *msg) { struct rte_mbuf *mbuf = (struct rte_mbuf *)msg->args[MSG_ARG_0].p; - struct protocol_stack *stack = (struct protocol_stack*)msg->args[MSG_ARG_1].p; + struct protocol_stack *stack = get_protocol_stack(); eth_dev_recv(mbuf, stack); } @@ -893,7 +896,7 @@ void stack_send(struct rpc_msg *msg) { int32_t fd = msg->args[MSG_ARG_0].i; size_t len = msg->args[MSG_ARG_1].size; - struct protocol_stack *stack = (struct protocol_stack *)msg->args[MSG_ARG_3].p; + struct protocol_stack *stack = get_protocol_stack(); int replenish_again; struct lwip_sock *sock = get_socket(fd); @@ -947,7 +950,7 @@ void stack_broadcast_arp(struct rte_mbuf *mbuf, struct protocol_stack *cur_stack } copy_mbuf(mbuf_copy, mbuf); - ret = rpc_call_arp(stack, mbuf_copy); + ret = rpc_call_arp(&stack->rpc_queue, mbuf_copy); if (ret != 0) { return; } @@ -971,7 +974,7 @@ void stack_broadcast_clean_epoll(struct wakeup_poll *wakeup) for (int32_t i = 0; i < stack_group->stack_num; i++) { stack = stack_group->stacks[i]; - rpc_call_clean_epoll(stack, wakeup); + rpc_call_clean_epoll(&stack->rpc_queue, wakeup); } } @@ -985,17 +988,11 @@ void stack_clean_epoll(struct rpc_msg *msg) void stack_mempool_size(struct rpc_msg *msg) { - struct protocol_stack *stack = (struct protocol_stack*)msg->args[MSG_ARG_0].p; + struct protocol_stack *stack = get_protocol_stack(); msg->result = rte_mempool_avail_count(stack->rxtx_mbuf_pool); } -void stack_rpcpool_size(struct rpc_msg *msg) -{ - struct rpc_msg_pool *rpc_mem_pool = (struct rpc_msg_pool*)msg->args[MSG_ARG_0].p; - msg->result = rte_mempool_avail_count(rpc_mem_pool->mempool); -} - void stack_create_shadow_fd(struct rpc_msg *msg) { int32_t fd = msg->args[MSG_ARG_0].i; @@ -1049,8 +1046,8 @@ void stack_create_shadow_fd(struct rpc_msg *msg) void stack_replenish_sendring(struct rpc_msg *msg) { - struct protocol_stack *stack = (struct protocol_stack *)msg->args[MSG_ARG_0].p; - struct lwip_sock *sock = (struct lwip_sock *)msg->args[MSG_ARG_1].p; + struct protocol_stack *stack = get_protocol_stack(); + struct lwip_sock *sock = (struct lwip_sock *)msg->args[MSG_ARG_0].p; msg->result = do_lwip_replenish_sendring(stack, sock); } @@ -1070,7 +1067,7 @@ void stack_get_connnum(struct rpc_msg *msg) void stack_recvlist_count(struct rpc_msg *msg) { - struct protocol_stack *stack = (struct protocol_stack *)msg->args[MSG_ARG_0].p; + struct protocol_stack *stack = get_protocol_stack(); struct list_node *list = &stack->recv_list; uint32_t count = 0; struct list_node *node; @@ -1086,16 +1083,16 @@ void stack_recvlist_count(struct rpc_msg *msg) /* when fd is listenfd, listenfd of all protocol stack thread will be closed */ int32_t stack_broadcast_close(int32_t fd) { - struct lwip_sock *sock = get_socket(fd); int32_t ret = 0; - + struct lwip_sock *sock = get_socket(fd); + struct protocol_stack *stack = get_protocol_stack_by_fd(fd); if (sock == NULL) { - return -1; + GAZELLE_RETURN(EBADF); } do { sock = sock->listen_next; - if (rpc_call_close(fd)) { + if (stack == NULL || rpc_call_close(&stack->rpc_queue, fd)) { ret = -1; } @@ -1103,7 +1100,8 @@ int32_t stack_broadcast_close(int32_t fd) break; } fd = sock->conn->callback_arg.socket; - } while (sock); + stack = get_protocol_stack_by_fd(fd); + } while (1); return ret; } @@ -1112,13 +1110,14 @@ int stack_broadcast_shutdown(int fd, int how) { int32_t ret = 0; struct lwip_sock *sock = get_socket(fd); + struct protocol_stack *stack = get_protocol_stack_by_fd(fd); if (sock == NULL) { - return -1; + GAZELLE_RETURN(EBADF); } do { sock = sock->listen_next; - if (rpc_call_shutdown(fd, how)) { + if (stack == NULL || rpc_call_shutdown(&stack->rpc_queue, fd, how)) { ret = -1; } @@ -1126,7 +1125,8 @@ int stack_broadcast_shutdown(int fd, int how) break; } fd = sock->conn->callback_arg.socket; - } while (sock); + stack = get_protocol_stack_by_fd(fd); + } while (1); return ret; } @@ -1134,7 +1134,11 @@ int stack_broadcast_shutdown(int fd, int how) /* choice one stack listen */ int32_t stack_single_listen(int32_t fd, int32_t backlog) { - return rpc_call_listen(fd, backlog); + struct protocol_stack *stack = get_protocol_stack_by_fd(fd); + if (stack == NULL) { + GAZELLE_RETURN(EBADF); + } + return rpc_call_listen(&stack->rpc_queue, fd, backlog); } /* listen sync to all protocol stack thread, so that any protocol stack thread can build connect */ @@ -1153,12 +1157,12 @@ int32_t stack_broadcast_listen(int32_t fd, int32_t backlog) int32_t ret, clone_fd; struct lwip_sock *sock = get_socket(fd); - if (sock == NULL) { - LSTACK_LOG(ERR, LSTACK, "tid %ld, %d get sock null\n", get_stack_tid(), fd); - GAZELLE_RETURN(EINVAL); + if (sock == NULL || cur_stack == NULL) { + LSTACK_LOG(ERR, LSTACK, "tid %ld, %d get sock null or stack null\n", get_stack_tid(), fd); + GAZELLE_RETURN(EBADF); } - ret = rpc_call_getsockname(fd, (struct sockaddr *)&addr, &addr_len); + ret = rpc_call_getsockname(&cur_stack->rpc_queue, fd, (struct sockaddr *)&addr, &addr_len); if (ret != 0) { return ret; } @@ -1172,7 +1176,7 @@ int32_t stack_broadcast_listen(int32_t fd, int32_t backlog) continue; } if (stack != cur_stack) { - clone_fd = rpc_call_shadow_fd(stack, fd, (struct sockaddr *)&addr, addr_len); + clone_fd = rpc_call_shadow_fd(&stack->rpc_queue, fd, (struct sockaddr *)&addr, addr_len); if (clone_fd < 0) { stack_broadcast_close(fd); return clone_fd; @@ -1187,7 +1191,7 @@ int32_t stack_broadcast_listen(int32_t fd, int32_t backlog) get_socket_by_fd(clone_fd)->conn->is_master_fd = 0; } - ret = rpc_call_listen(clone_fd, backlog); + ret = rpc_call_listen(&stack->rpc_queue, clone_fd, backlog); if (ret < 0) { stack_broadcast_close(fd); return ret; @@ -1234,7 +1238,11 @@ static void inline del_accept_in_event(struct lwip_sock *sock) /* choice one stack bind */ int32_t stack_single_bind(int32_t fd, const struct sockaddr *name, socklen_t namelen) { - return rpc_call_bind(fd, name, namelen); + struct protocol_stack *stack = get_protocol_stack_by_fd(fd); + if (stack == NULL) { + GAZELLE_RETURN(EBADF); + } + return rpc_call_bind(&stack->rpc_queue, fd, name, namelen); } /* bind sync to all protocol stack thread, so that any protocol stack thread can build connect */ @@ -1245,12 +1253,12 @@ int32_t stack_broadcast_bind(int32_t fd, const struct sockaddr *name, socklen_t int32_t ret, clone_fd; struct lwip_sock *sock = get_socket(fd); - if (sock == NULL) { - LSTACK_LOG(ERR, LSTACK, "tid %ld, %d get sock null\n", get_stack_tid(), fd); - GAZELLE_RETURN(EINVAL); + if (sock == NULL || cur_stack == NULL) { + LSTACK_LOG(ERR, LSTACK, "tid %ld, %d get sock null or stack null\n", get_stack_tid(), fd); + GAZELLE_RETURN(EBADF); } - ret = rpc_call_bind(fd, name, namelen); + ret = rpc_call_bind(&cur_stack->rpc_queue, fd, name, namelen); if (ret < 0) { close(fd); return ret; @@ -1260,7 +1268,7 @@ int32_t stack_broadcast_bind(int32_t fd, const struct sockaddr *name, socklen_t for (int32_t i = 0; i < stack_group->stack_num; ++i) { stack = stack_group->stacks[i]; if (stack != cur_stack) { - clone_fd = rpc_call_shadow_fd(stack, fd, name, namelen); + clone_fd = rpc_call_shadow_fd(&stack->rpc_queue, fd, name, namelen); if (clone_fd < 0) { stack_broadcast_close(fd); return clone_fd; @@ -1276,9 +1284,9 @@ int32_t stack_broadcast_accept4(int32_t fd, struct sockaddr *addr, socklen_t *ad int32_t ret = -1; struct lwip_sock *min_sock = NULL; struct lwip_sock *sock = get_socket(fd); + struct protocol_stack *stack = NULL; if (sock == NULL) { - errno = EINVAL; - return -1; + GAZELLE_RETURN(EBADF); } if (netconn_is_nonblocking(sock->conn)) { @@ -1290,7 +1298,11 @@ int32_t stack_broadcast_accept4(int32_t fd, struct sockaddr *addr, socklen_t *ad } if (min_sock && min_sock->conn) { - ret = rpc_call_accept(min_sock->conn->callback_arg.socket, addr, addrlen, flags); + stack = get_protocol_stack_by_fd(min_sock->conn->callback_arg.socket); + if (stack == NULL) { + GAZELLE_RETURN(EBADF); + } + ret = rpc_call_accept(&stack->rpc_queue, min_sock->conn->callback_arg.socket, addr, addrlen, flags); } if (min_sock && min_sock->wakeup && min_sock->wakeup->type == WAKEUP_EPOLL) { @@ -1344,7 +1356,7 @@ void stack_group_exit(void) } if (stack != stack_group->stacks[i]) { - rpc_call_stack_exit(stack_group->stacks[i]); + rpc_call_stack_exit(&stack_group->stacks[i]->rpc_queue); } } diff --git a/src/lstack/core/lstack_stack_stat.c b/src/lstack/core/lstack_stack_stat.c index 23571b4..01ac6fb 100644 --- a/src/lstack/core/lstack_stack_stat.c +++ b/src/lstack/core/lstack_stack_stat.c @@ -175,20 +175,17 @@ static void get_stack_stats(struct gazelle_stack_dfx_data *dfx, struct protocol_ get_wakeup_stat(stack_group, stack, &dfx->data.pkts.wakeup_stat); - dfx->data.pkts.call_alloc_fail = stack_group->call_alloc_fail; + dfx->data.pkts.call_alloc_fail = rpc_stats_get()->call_alloc_fail; - int32_t rpc_call_result = rpc_call_msgcnt(stack); + int32_t rpc_call_result = rpc_msgcnt(&stack->rpc_queue); dfx->data.pkts.call_msg_cnt = (rpc_call_result < 0) ? 0 : rpc_call_result; - rpc_call_result = rpc_call_mbufpoolsize(stack); + rpc_call_result = rpc_call_mbufpoolsize(&stack->dfx_rpc_queue); dfx->data.pkts.mbufpool_avail_cnt = (rpc_call_result < 0) ? 0 : rpc_call_result; - rpc_call_result = rpc_call_recvlistcnt(stack); + rpc_call_result = rpc_call_recvlistcnt(&stack->dfx_rpc_queue); dfx->data.pkts.recv_list_cnt = (rpc_call_result < 0) ? 0 : rpc_call_result; - rpc_call_result = rpc_call_rpcpool_size(stack); - dfx->data.pkts.rpcpool_avail_cnt = (rpc_call_result < 0) ? 0 : rpc_call_result; - dfx->data.pkts.conn_num = stack->conn_num; } @@ -219,9 +216,10 @@ static void get_stack_dfx_data(struct gazelle_stack_dfx_data *dfx, struct protoc } break; case GAZELLE_STAT_LSTACK_SHOW_CONN: - rpc_call_result = rpc_call_conntable(stack, dfx->data.conn.conn_list, GAZELLE_LSTACK_MAX_CONN); + rpc_call_result = rpc_call_conntable(&stack->dfx_rpc_queue, dfx->data.conn.conn_list, + GAZELLE_LSTACK_MAX_CONN); dfx->data.conn.conn_num = (rpc_call_result < 0) ? 0 : rpc_call_result; - rpc_call_result = rpc_call_connnum(stack); + rpc_call_result = rpc_call_connnum(&stack->dfx_rpc_queue); dfx->data.conn.total_conn_num = (rpc_call_result < 0) ? 0 : rpc_call_result; break; case GAZELLE_STAT_LSTACK_SHOW_LATENCY: @@ -296,7 +294,7 @@ int handle_stack_cmd(int fd, enum GAZELLE_STAT_MODE stat_mode) } dfx.tid = stack->tid; - dfx.stack_id = i; + dfx.stack_id = i; if (i == stack_group->stack_num - 1) { dfx.eof = 1; } diff --git a/src/lstack/core/lstack_thread_rpc.c b/src/lstack/core/lstack_thread_rpc.c index 2af30d7..1fdb037 100644 --- a/src/lstack/core/lstack_thread_rpc.c +++ b/src/lstack/core/lstack_thread_rpc.c @@ -9,21 +9,20 @@ * PURPOSE. * See the Mulan PSL v2 for more details. */ -#include <sys/types.h> -#include <stdatomic.h> #include <lwip/sockets.h> -#include <lwipsock.h> #include <rte_mempool.h> #include "lstack_log.h" -#include "lstack_lwip.h" -#include "lstack_protocol_stack.h" -#include "lstack_control_plane.h" -#include "gazelle_base_func.h" #include "lstack_dpdk.h" +#include "lstack_rpc_proc.h" #include "lstack_thread_rpc.h" static PER_THREAD struct rpc_msg_pool *g_rpc_pool = NULL; +static struct rpc_stats g_rpc_stats; +struct rpc_stats *rpc_stats_get(void) +{ + return &g_rpc_stats; +} static inline __attribute__((always_inline)) struct rpc_msg *get_rpc_msg(struct rpc_msg_pool *rpc_pool) { @@ -37,33 +36,29 @@ static inline __attribute__((always_inline)) struct rpc_msg *get_rpc_msg(struct return msg; } -static struct rpc_msg *rpc_msg_alloc(struct protocol_stack *stack, rpc_msg_func func) +static struct rpc_msg *rpc_msg_alloc(rpc_msg_func func) { struct rpc_msg *msg = NULL; - if (stack == NULL) { - return NULL; - } - if (g_rpc_pool == NULL) { g_rpc_pool = calloc(1, sizeof(struct rpc_msg_pool)); if (g_rpc_pool == NULL) { LSTACK_LOG(INFO, LSTACK, "g_rpc_pool calloc failed\n"); - get_protocol_stack_group()->call_alloc_fail++; + g_rpc_stats.call_alloc_fail++; return NULL; } g_rpc_pool->mempool = create_mempool("rpc_pool", RPC_MSG_MAX, sizeof(struct rpc_msg), 0, rte_gettid()); if (g_rpc_pool->mempool == NULL) { - get_protocol_stack_group()->call_alloc_fail++; + g_rpc_stats.call_alloc_fail++; return NULL; } } msg = get_rpc_msg(g_rpc_pool); if (msg == NULL) { - get_protocol_stack_group()->call_alloc_fail++; + g_rpc_stats.call_alloc_fail++; return NULL; } msg->rpcpool = g_rpc_pool; @@ -75,7 +70,7 @@ static struct rpc_msg *rpc_msg_alloc(struct protocol_stack *stack, rpc_msg_func return msg; } -static inline __attribute__((always_inline)) int32_t rpc_sync_call(lockless_queue *queue, struct rpc_msg *msg) +static inline __attribute__((always_inline)) int32_t rpc_sync_call(rpc_queue *queue, struct rpc_msg *msg) { int32_t ret; @@ -90,13 +85,18 @@ static inline __attribute__((always_inline)) int32_t rpc_sync_call(lockless_queu return ret; } -int poll_rpc_msg(struct protocol_stack *stack, uint32_t max_num) +int32_t rpc_msgcnt(rpc_queue *queue) +{ + return lockless_queue_count(queue); +} + +int rpc_poll_msg(rpc_queue *queue, uint32_t max_num) { int force_quit = 0; struct rpc_msg *msg = NULL; while (max_num--) { - lockless_queue_node *node = lockless_queue_mpsc_pop(&stack->rpc_queue); + lockless_queue_node *node = lockless_queue_mpsc_pop(queue); if (node == NULL) { break; } @@ -106,7 +106,7 @@ int poll_rpc_msg(struct protocol_stack *stack, uint32_t max_num) if (msg->func) { msg->func(msg); } else { - stack->stats.call_null++; + g_rpc_stats.call_null++; } if (msg->func == stack_exit_by_rpc) { @@ -127,9 +127,9 @@ int poll_rpc_msg(struct protocol_stack *stack, uint32_t max_num) return force_quit; } -int32_t rpc_call_conntable(struct protocol_stack *stack, void *conn_table, uint32_t max_conn) +int32_t rpc_call_conntable(rpc_queue *queue, void *conn_table, uint32_t max_conn) { - struct rpc_msg *msg = rpc_msg_alloc(stack, stack_get_conntable); + struct rpc_msg *msg = rpc_msg_alloc(stack_get_conntable); if (msg == NULL) { return -1; } @@ -137,22 +137,22 @@ int32_t rpc_call_conntable(struct protocol_stack *stack, void *conn_table, uint3 msg->args[MSG_ARG_0].p = conn_table; msg->args[MSG_ARG_1].u = max_conn; - return rpc_sync_call(&stack->rpc_queue, msg); + return rpc_sync_call(queue, msg); } -int32_t rpc_call_connnum(struct protocol_stack *stack) +int32_t rpc_call_connnum(rpc_queue *queue) { - struct rpc_msg *msg = rpc_msg_alloc(stack, stack_get_connnum); + struct rpc_msg *msg = rpc_msg_alloc(stack_get_connnum); if (msg == NULL) { return -1; } - return rpc_sync_call(&stack->rpc_queue, msg); + return rpc_sync_call(queue, msg); } -int32_t rpc_call_shadow_fd(struct protocol_stack *stack, int32_t fd, const struct sockaddr *addr, socklen_t addrlen) +int32_t rpc_call_shadow_fd(rpc_queue *queue, int32_t fd, const struct sockaddr *addr, socklen_t addrlen) { - struct rpc_msg *msg = rpc_msg_alloc(stack, stack_create_shadow_fd); + struct rpc_msg *msg = rpc_msg_alloc(stack_create_shadow_fd); if (msg == NULL) { return -1; } @@ -161,100 +161,67 @@ int32_t rpc_call_shadow_fd(struct protocol_stack *stack, int32_t fd, const struc msg->args[MSG_ARG_1].cp = addr; msg->args[MSG_ARG_2].socklen = addrlen; - return rpc_sync_call(&stack->rpc_queue, msg); -} - -static void rpc_msgcnt(struct rpc_msg *msg) -{ - struct protocol_stack *stack = get_protocol_stack(); - msg->result = lockless_queue_count(&stack->rpc_queue); -} - -int32_t rpc_call_msgcnt(struct protocol_stack *stack) -{ - struct rpc_msg *msg = rpc_msg_alloc(stack, rpc_msgcnt); - if (msg == NULL) { - return -1; - } - - return rpc_sync_call(&stack->rpc_queue, msg); + return rpc_sync_call(queue, msg); } -int32_t rpc_call_thread_regphase1(struct protocol_stack *stack, void *conn) +int32_t rpc_call_thread_regphase1(rpc_queue *queue, void *conn) { - struct rpc_msg *msg = rpc_msg_alloc(stack, thread_register_phase1); + struct rpc_msg *msg = rpc_msg_alloc(thread_register_phase1); if (msg == NULL) { return -1; } msg->args[MSG_ARG_0].p = conn; - return rpc_sync_call(&stack->rpc_queue, msg); + return rpc_sync_call(queue, msg); } -int32_t rpc_call_thread_regphase2(struct protocol_stack *stack, void *conn) +int32_t rpc_call_thread_regphase2(rpc_queue *queue, void *conn) { - struct rpc_msg *msg = rpc_msg_alloc(stack, thread_register_phase2); + struct rpc_msg *msg = rpc_msg_alloc(thread_register_phase2); if (msg == NULL) { return -1; } msg->args[MSG_ARG_0].p = conn; - return rpc_sync_call(&stack->rpc_queue, msg); + return rpc_sync_call(queue, msg); } -int32_t rpc_call_mbufpoolsize(struct protocol_stack *stack) +int32_t rpc_call_mbufpoolsize(rpc_queue *queue) { - struct rpc_msg *msg = rpc_msg_alloc(stack, stack_mempool_size); + struct rpc_msg *msg = rpc_msg_alloc(stack_mempool_size); if (msg == NULL) { return -1; } - msg->args[MSG_ARG_0].p = stack; - - return rpc_sync_call(&stack->rpc_queue, msg); -} - -int32_t rpc_call_rpcpool_size(struct protocol_stack *stack) -{ - struct rpc_msg *msg = rpc_msg_alloc(stack, stack_rpcpool_size); - if (msg == NULL) { - return -1; - } - msg->args[MSG_ARG_0].p = g_rpc_pool; - return rpc_sync_call(&stack->rpc_queue, msg); + return rpc_sync_call(queue, msg); } -int32_t rpc_call_recvlistcnt(struct protocol_stack *stack) +int32_t rpc_call_recvlistcnt(rpc_queue *queue) { - struct rpc_msg *msg = rpc_msg_alloc(stack, stack_recvlist_count); + struct rpc_msg *msg = rpc_msg_alloc(stack_recvlist_count); if (msg == NULL) { return -1; } - msg->args[MSG_ARG_0].p = stack; - - return rpc_sync_call(&stack->rpc_queue, msg); + return rpc_sync_call(queue, msg); } -int32_t rpc_call_arp(struct protocol_stack *stack, struct rte_mbuf *mbuf) +int32_t rpc_call_arp(rpc_queue *queue, void *mbuf) { - struct rpc_msg *msg = rpc_msg_alloc(stack, stack_arp); + struct rpc_msg *msg = rpc_msg_alloc(stack_arp); if (msg == NULL) { return -1; } msg->sync_flag = 0; msg->args[MSG_ARG_0].p = mbuf; - msg->args[MSG_ARG_1].p = stack; - rpc_call(&stack->rpc_queue, msg); + rpc_call(queue, msg); return 0; } -int32_t rpc_call_socket(int32_t domain, int32_t type, int32_t protocol) +int32_t rpc_call_socket(rpc_queue *queue, int32_t domain, int32_t type, int32_t protocol) { - struct protocol_stack *stack = get_bind_protocol_stack(); - - struct rpc_msg *msg = rpc_msg_alloc(stack, stack_socket); + struct rpc_msg *msg = rpc_msg_alloc(stack_socket); if (msg == NULL) { return -1; } @@ -263,39 +230,35 @@ int32_t rpc_call_socket(int32_t domain, int32_t type, int32_t protocol) msg->args[MSG_ARG_1].i = type; msg->args[MSG_ARG_2].i = protocol; - return rpc_sync_call(&stack->rpc_queue, msg); + return rpc_sync_call(queue, msg); } -int32_t rpc_call_close(int fd) +int32_t rpc_call_close(rpc_queue *queue, int fd) { - struct protocol_stack *stack = get_protocol_stack_by_fd(fd); - struct rpc_msg *msg = rpc_msg_alloc(stack, stack_close); + struct rpc_msg *msg = rpc_msg_alloc(stack_close); if (msg == NULL) { return -1; } msg->args[MSG_ARG_0].i = fd; - return rpc_sync_call(&stack->rpc_queue, msg); + return rpc_sync_call(queue, msg); } -int32_t rpc_call_stack_exit(struct protocol_stack *stack) +int32_t rpc_call_stack_exit(rpc_queue *queue) { - struct rpc_msg *msg = rpc_msg_alloc(stack, stack_exit_by_rpc); + struct rpc_msg *msg = rpc_msg_alloc(stack_exit_by_rpc); if (msg == NULL) { - LSTACK_LOG(INFO, LSTACK, "rpc msg alloc failed\n"); return -1; } - rpc_call(&stack->rpc_queue, msg); + rpc_call(queue, msg); return 0; } -int32_t rpc_call_shutdown(int fd, int how) +int32_t rpc_call_shutdown(rpc_queue *queue, int fd, int how) { - struct protocol_stack *stack = get_protocol_stack_by_fd(fd); - - struct rpc_msg *msg = rpc_msg_alloc(stack, stack_shutdown); + struct rpc_msg *msg = rpc_msg_alloc(stack_shutdown); if (msg == NULL) { return -1; } @@ -303,25 +266,24 @@ int32_t rpc_call_shutdown(int fd, int how) msg->args[MSG_ARG_0].i = fd; msg->args[MSG_ARG_1].i = how; - return rpc_sync_call(&stack->rpc_queue, msg); + return rpc_sync_call(queue, msg); } -void rpc_call_clean_epoll(struct protocol_stack *stack, struct wakeup_poll *wakeup) +void rpc_call_clean_epoll(rpc_queue *queue, void *wakeup) { - struct rpc_msg *msg = rpc_msg_alloc(stack, stack_clean_epoll); + struct rpc_msg *msg = rpc_msg_alloc(stack_clean_epoll); if (msg == NULL) { return; } msg->args[MSG_ARG_0].p = wakeup; - rpc_sync_call(&stack->rpc_queue, msg); + rpc_sync_call(queue, msg); } -int32_t rpc_call_bind(int32_t fd, const struct sockaddr *addr, socklen_t addrlen) +int32_t rpc_call_bind(rpc_queue *queue, int32_t fd, const struct sockaddr *addr, socklen_t addrlen) { - struct protocol_stack *stack = get_protocol_stack_by_fd(fd); - struct rpc_msg *msg = rpc_msg_alloc(stack, stack_bind); + struct rpc_msg *msg = rpc_msg_alloc(stack_bind); if (msg == NULL) { return -1; } @@ -330,13 +292,12 @@ int32_t rpc_call_bind(int32_t fd, const struct sockaddr *addr, socklen_t addrlen msg->args[MSG_ARG_1].cp = addr; msg->args[MSG_ARG_2].socklen = addrlen; - return rpc_sync_call(&stack->rpc_queue, msg); + return rpc_sync_call(queue, msg); } -int32_t rpc_call_listen(int s, int backlog) +int32_t rpc_call_listen(rpc_queue *queue, int s, int backlog) { - struct protocol_stack *stack = get_protocol_stack_by_fd(s); - struct rpc_msg *msg = rpc_msg_alloc(stack, stack_listen); + struct rpc_msg *msg = rpc_msg_alloc(stack_listen); if (msg == NULL) { return -1; } @@ -344,13 +305,12 @@ int32_t rpc_call_listen(int s, int backlog) msg->args[MSG_ARG_0].i = s; msg->args[MSG_ARG_1].i = backlog; - return rpc_sync_call(&stack->rpc_queue, msg); + return rpc_sync_call(queue, msg); } -int32_t rpc_call_accept(int fd, struct sockaddr *addr, socklen_t *addrlen, int flags) +int32_t rpc_call_accept(rpc_queue *queue, int fd, struct sockaddr *addr, socklen_t *addrlen, int flags) { - struct protocol_stack *stack = get_protocol_stack_by_fd(fd); - struct rpc_msg *msg = rpc_msg_alloc(stack, stack_accept); + struct rpc_msg *msg = rpc_msg_alloc(stack_accept); if (msg == NULL) { return -1; } @@ -360,13 +320,12 @@ int32_t rpc_call_accept(int fd, struct sockaddr *addr, socklen_t *addrlen, int f msg->args[MSG_ARG_2].p = addrlen; msg->args[MSG_ARG_3].i = flags; - return rpc_sync_call(&stack->rpc_queue, msg); + return rpc_sync_call(queue, msg); } -int32_t rpc_call_connect(int fd, const struct sockaddr *addr, socklen_t addrlen) +int32_t rpc_call_connect(rpc_queue *queue, int fd, const struct sockaddr *addr, socklen_t addrlen) { - struct protocol_stack *stack = get_protocol_stack_by_fd(fd); - struct rpc_msg *msg = rpc_msg_alloc(stack, stack_connect); + struct rpc_msg *msg = rpc_msg_alloc(stack_connect); if (msg == NULL) { return -1; } @@ -375,7 +334,7 @@ int32_t rpc_call_connect(int fd, const struct sockaddr *addr, socklen_t addrlen) msg->args[MSG_ARG_1].cp = addr; msg->args[MSG_ARG_2].socklen = addrlen; - int32_t ret = rpc_sync_call(&stack->rpc_queue, msg); + int32_t ret = rpc_sync_call(queue, msg); if (ret < 0) { errno = -ret; return -1; @@ -383,10 +342,9 @@ int32_t rpc_call_connect(int fd, const struct sockaddr *addr, socklen_t addrlen) return ret; } -int32_t rpc_call_getpeername(int fd, struct sockaddr *addr, socklen_t *addrlen) +int32_t rpc_call_getpeername(rpc_queue *queue, int fd, struct sockaddr *addr, socklen_t *addrlen) { - struct protocol_stack *stack = get_protocol_stack_by_fd(fd); - struct rpc_msg *msg = rpc_msg_alloc(stack, stack_getpeername); + struct rpc_msg *msg = rpc_msg_alloc(stack_getpeername); if (msg == NULL) { return -1; } @@ -395,13 +353,12 @@ int32_t rpc_call_getpeername(int fd, struct sockaddr *addr, socklen_t *addrlen) msg->args[MSG_ARG_1].p = addr; msg->args[MSG_ARG_2].p = addrlen; - return rpc_sync_call(&stack->rpc_queue, msg); + return rpc_sync_call(queue, msg); } -int32_t rpc_call_getsockname(int fd, struct sockaddr *addr, socklen_t *addrlen) +int32_t rpc_call_getsockname(rpc_queue *queue, int fd, struct sockaddr *addr, socklen_t *addrlen) { - struct protocol_stack *stack = get_protocol_stack_by_fd(fd); - struct rpc_msg *msg = rpc_msg_alloc(stack, stack_getsockname); + struct rpc_msg *msg = rpc_msg_alloc(stack_getsockname); if (msg == NULL) { return -1; } @@ -410,13 +367,12 @@ int32_t rpc_call_getsockname(int fd, struct sockaddr *addr, socklen_t *addrlen) msg->args[MSG_ARG_1].p = addr; msg->args[MSG_ARG_2].p = addrlen; - return rpc_sync_call(&stack->rpc_queue, msg); + return rpc_sync_call(queue, msg); } -int32_t rpc_call_getsockopt(int fd, int level, int optname, void *optval, socklen_t *optlen) +int32_t rpc_call_getsockopt(rpc_queue *queue, int fd, int level, int optname, void *optval, socklen_t *optlen) { - struct protocol_stack *stack = get_protocol_stack_by_fd(fd); - struct rpc_msg *msg = rpc_msg_alloc(stack, stack_getsockopt); + struct rpc_msg *msg = rpc_msg_alloc(stack_getsockopt); if (msg == NULL) { return -1; } @@ -427,13 +383,12 @@ int32_t rpc_call_getsockopt(int fd, int level, int optname, void *optval, sockle msg->args[MSG_ARG_3].p = optval; msg->args[MSG_ARG_4].p = optlen; - return rpc_sync_call(&stack->rpc_queue, msg); + return rpc_sync_call(queue, msg); } -int32_t rpc_call_setsockopt(int fd, int level, int optname, const void *optval, socklen_t optlen) +int32_t rpc_call_setsockopt(rpc_queue *queue, int fd, int level, int optname, const void *optval, socklen_t optlen) { - struct protocol_stack *stack = get_protocol_stack_by_fd(fd); - struct rpc_msg *msg = rpc_msg_alloc(stack, stack_setsockopt); + struct rpc_msg *msg = rpc_msg_alloc(stack_setsockopt); if (msg == NULL) { return -1; } @@ -444,13 +399,12 @@ int32_t rpc_call_setsockopt(int fd, int level, int optname, const void *optval, msg->args[MSG_ARG_3].cp = optval; msg->args[MSG_ARG_4].socklen = optlen; - return rpc_sync_call(&stack->rpc_queue, msg); + return rpc_sync_call(queue, msg); } -int32_t rpc_call_fcntl(int fd, int cmd, long val) +int32_t rpc_call_fcntl(rpc_queue *queue, int fd, int cmd, long val) { - struct protocol_stack *stack = get_protocol_stack_by_fd(fd); - struct rpc_msg *msg = rpc_msg_alloc(stack, stack_fcntl); + struct rpc_msg *msg = rpc_msg_alloc(stack_fcntl); if (msg == NULL) { return -1; } @@ -459,13 +413,12 @@ int32_t rpc_call_fcntl(int fd, int cmd, long val) msg->args[MSG_ARG_1].i = cmd; msg->args[MSG_ARG_2].l = val; - return rpc_sync_call(&stack->rpc_queue, msg); + return rpc_sync_call(queue, msg); } -int32_t rpc_call_ioctl(int fd, long cmd, void *argp) +int32_t rpc_call_ioctl(rpc_queue *queue, int fd, long cmd, void *argp) { - struct protocol_stack *stack = get_protocol_stack_by_fd(fd); - struct rpc_msg *msg = rpc_msg_alloc(stack, stack_ioctl); + struct rpc_msg *msg = rpc_msg_alloc(stack_ioctl); if (msg == NULL) { return -1; } @@ -474,27 +427,24 @@ int32_t rpc_call_ioctl(int fd, long cmd, void *argp) msg->args[MSG_ARG_1].l = cmd; msg->args[MSG_ARG_2].p = argp; - return rpc_sync_call(&stack->rpc_queue, msg); + return rpc_sync_call(queue, msg); } -int32_t rpc_call_replenish(struct protocol_stack *stack, struct lwip_sock *sock) +int32_t rpc_call_replenish(rpc_queue *queue, void *sock) { - struct rpc_msg *msg = rpc_msg_alloc(stack, stack_replenish_sendring); + struct rpc_msg *msg = rpc_msg_alloc(stack_replenish_sendring); if (msg == NULL) { return -1; } - msg->args[MSG_ARG_0].p = stack; - msg->args[MSG_ARG_1].p = sock; + msg->args[MSG_ARG_0].p = sock; - return rpc_sync_call(&stack->rpc_queue, msg); + return rpc_sync_call(queue, msg); } -int32_t rpc_call_send(int fd, const void *buf, size_t len, int flags) +int32_t rpc_call_send(rpc_queue *queue, int fd, const void *buf, size_t len, int flags) { - struct protocol_stack *stack = get_protocol_stack_by_fd(fd); - - struct rpc_msg *msg = rpc_msg_alloc(stack, stack_send); + struct rpc_msg *msg = rpc_msg_alloc(stack_send); if (msg == NULL) { return -1; } @@ -502,10 +452,9 @@ int32_t rpc_call_send(int fd, const void *buf, size_t len, int flags) msg->args[MSG_ARG_0].i = fd; msg->args[MSG_ARG_1].size = len; msg->args[MSG_ARG_2].i = flags; - msg->args[MSG_ARG_3].p = stack; msg->sync_flag = 0; - rpc_call(&stack->rpc_queue, msg); + rpc_call(queue, msg); return 0; } diff --git a/src/lstack/include/lstack_control_plane.h b/src/lstack/include/lstack_control_plane.h index aed5443..548d725 100644 --- a/src/lstack/include/lstack_control_plane.h +++ b/src/lstack/include/lstack_control_plane.h @@ -23,14 +23,11 @@ enum vdev_request { VDEV_NONE, }; -struct rpc_msg; int client_reg_thrd_ring(void); int32_t control_init_client(bool is_reconnect); void control_client_thread(void *arg); void control_server_thread(void *arg); bool get_register_state(void); -void thread_register_phase1(struct rpc_msg *msg); -void thread_register_phase2(struct rpc_msg *msg); void control_fd_close(void); void delete_primary_path(void); diff --git a/src/lstack/include/lstack_protocol_stack.h b/src/lstack/include/lstack_protocol_stack.h index c681547..7489f2a 100644 --- a/src/lstack/include/lstack_protocol_stack.h +++ b/src/lstack/include/lstack_protocol_stack.h @@ -21,7 +21,7 @@ #include <lwip/netif.h> #include "gazelle_dfx_msg.h" -#include "lstack_lockless_queue.h" +#include "lstack_thread_rpc.h" #include "lstack_ethdev.h" #include "gazelle_opt.h" @@ -59,13 +59,15 @@ struct protocol_stack { volatile bool low_power; bool is_send_thread; - lockless_queue rpc_queue __rte_cache_aligned; - char pad __rte_cache_aligned; + char pad1 __rte_cache_aligned; + rpc_queue dfx_rpc_queue; + rpc_queue rpc_queue; + char pad2 __rte_cache_aligned; /* kernel event thread read/write frequently */ struct epoll_event kernel_events[KERNEL_EPOLL_MAX]; int32_t kernel_event_num; - char pad1 __rte_cache_aligned; + char pad3 __rte_cache_aligned; struct netif netif; struct lstack_dev_ops dev_ops; @@ -149,36 +151,10 @@ void stack_broadcast_clean_epoll(struct wakeup_poll *wakeup); void stack_send_pkts(struct protocol_stack *stack); -struct rpc_msg; struct thread_params { uint16_t queue_id; uint16_t idx; }; -void stack_clean_epoll(struct rpc_msg *msg); -void stack_arp(struct rpc_msg *msg); -void stack_socket(struct rpc_msg *msg); -void stack_close(struct rpc_msg *msg); -void stack_shutdown(struct rpc_msg *msg); -void stack_bind(struct rpc_msg *msg); -void stack_listen(struct rpc_msg *msg); -void stack_accept(struct rpc_msg *msg); -void stack_connect(struct rpc_msg *msg); -void stack_recv(struct rpc_msg *msg); -void stack_getpeername(struct rpc_msg *msg); -void stack_getsockname(struct rpc_msg *msg); -void stack_getsockopt(struct rpc_msg *msg); -void stack_setsockopt(struct rpc_msg *msg); -void stack_fcntl(struct rpc_msg *msg); -void stack_ioctl(struct rpc_msg *msg); -void stack_send(struct rpc_msg *msg); -void stack_mempool_size(struct rpc_msg *msg); -void stack_rpcpool_size(struct rpc_msg *msg); -void stack_create_shadow_fd(struct rpc_msg *msg); -void stack_replenish_sendring(struct rpc_msg *msg); -void stack_get_conntable(struct rpc_msg *msg); -void stack_get_connnum(struct rpc_msg *msg); -void stack_recvlist_count(struct rpc_msg *msg); -void stack_exit_by_rpc(struct rpc_msg *msg); int stack_polling(uint32_t wakeup_tick); #endif diff --git a/src/lstack/include/lstack_rpc_proc.h b/src/lstack/include/lstack_rpc_proc.h new file mode 100644 index 0000000..71f0c58 --- /dev/null +++ b/src/lstack/include/lstack_rpc_proc.h @@ -0,0 +1,46 @@ +/* +* Copyright (c) Huawei Technologies Co., Ltd. 2020-2021. All rights reserved. +* gazelle is licensed under the Mulan PSL v2. +* You can use this software according to the terms and conditions of the Mulan PSL v2. +* You may obtain a copy of Mulan PSL v2 at: +* http://license.coscl.org.cn/MulanPSL2 +* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR +* IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR +* PURPOSE. +* See the Mulan PSL v2 for more details. +*/ + +#ifndef __GAZELLE_RPC_PROC_H__ +#define __GAZELLE_RPC_PROC_H__ +#include "lstack_thread_rpc.h" + +void stack_clean_epoll(struct rpc_msg *msg); +void stack_arp(struct rpc_msg *msg); +void stack_socket(struct rpc_msg *msg); +void stack_close(struct rpc_msg *msg); +void stack_shutdown(struct rpc_msg *msg); +void stack_bind(struct rpc_msg *msg); +void stack_listen(struct rpc_msg *msg); +void stack_accept(struct rpc_msg *msg); +void stack_connect(struct rpc_msg *msg); +void stack_recv(struct rpc_msg *msg); +void stack_getpeername(struct rpc_msg *msg); +void stack_getsockname(struct rpc_msg *msg); +void stack_getsockopt(struct rpc_msg *msg); +void stack_setsockopt(struct rpc_msg *msg); +void stack_fcntl(struct rpc_msg *msg); +void stack_ioctl(struct rpc_msg *msg); +void stack_send(struct rpc_msg *msg); +void stack_mempool_size(struct rpc_msg *msg); +void stack_rpcpool_size(struct rpc_msg *msg); +void stack_create_shadow_fd(struct rpc_msg *msg); +void stack_replenish_sendring(struct rpc_msg *msg); +void stack_get_conntable(struct rpc_msg *msg); +void stack_get_connnum(struct rpc_msg *msg); +void stack_recvlist_count(struct rpc_msg *msg); +void stack_exit_by_rpc(struct rpc_msg *msg); + +void thread_register_phase1(struct rpc_msg *msg); +void thread_register_phase2(struct rpc_msg *msg); + +#endif diff --git a/src/lstack/include/lstack_thread_rpc.h b/src/lstack/include/lstack_thread_rpc.h index 633ef93..30caa66 100644 --- a/src/lstack/include/lstack_thread_rpc.h +++ b/src/lstack/include/lstack_thread_rpc.h @@ -28,6 +28,12 @@ #define RPC_MSG_MAX 4096 #define RPC_MSG_MASK (RPC_MSG_MAX - 1) +typedef struct lockless_queue rpc_queue; + +struct rpc_stats { + uint16_t call_null; + uint64_t call_alloc_fail; +}; struct rpc_msg; typedef void (*rpc_msg_func)(struct rpc_msg *msg); @@ -41,7 +47,9 @@ union rpc_msg_arg { socklen_t socklen; size_t size; }; -struct rpc_msg_pool; +struct rpc_msg_pool { + struct rte_mempool *mempool; +}; struct rpc_msg { pthread_spinlock_t lock; /* msg handler unlock notice sender msg process done */ int8_t sync_flag : 1; @@ -54,44 +62,41 @@ struct rpc_msg { union rpc_msg_arg args[RPM_MSG_ARG_SIZE]; /* resolve by type */ }; -struct rpc_msg_pool { - struct rte_mempool *mempool; -}; +static inline void rpc_queue_init(rpc_queue *queue) +{ + lockless_queue_init(queue); +} -struct protocol_stack; -struct rte_mbuf; -struct wakeup_poll; -struct lwip_sock; -int poll_rpc_msg(struct protocol_stack *stack, uint32_t max_num); -void rpc_call_clean_epoll(struct protocol_stack *stack, struct wakeup_poll *wakeup); -int32_t rpc_call_msgcnt(struct protocol_stack *stack); -int32_t rpc_call_shadow_fd(struct protocol_stack *stack, int32_t fd, const struct sockaddr *addr, socklen_t addrlen); -int32_t rpc_call_recvlistcnt(struct protocol_stack *stack); -int32_t rpc_call_thread_regphase1(struct protocol_stack *stack, void *conn); -int32_t rpc_call_thread_regphase2(struct protocol_stack *stack, void *conn); -int32_t rpc_call_conntable(struct protocol_stack *stack, void *conn_table, uint32_t max_conn); -int32_t rpc_call_connnum(struct protocol_stack *stack); -int32_t rpc_call_arp(struct protocol_stack *stack, struct rte_mbuf *mbuf); -int32_t rpc_call_socket(int32_t domain, int32_t type, int32_t protocol); -int32_t rpc_call_close(int32_t fd); -int32_t rpc_call_shutdown(int fd, int how); -int32_t rpc_call_bind(int32_t fd, const struct sockaddr *addr, socklen_t addrlen); -int32_t rpc_call_listen(int s, int backlog); -int32_t rpc_call_accept(int fd, struct sockaddr *addr, socklen_t *addrlen, int flags); -int32_t rpc_call_connect(int fd, const struct sockaddr *addr, socklen_t addrlen); -int32_t rpc_call_send(int fd, const void *buf, size_t len, int flags); -int32_t rpc_call_getpeername(int fd, struct sockaddr *addr, socklen_t *addrlen); -int32_t rpc_call_getsockname(int fd, struct sockaddr *addr, socklen_t *addrlen); -int32_t rpc_call_getsockopt(int fd, int level, int optname, void *optval, socklen_t *optlen); -int32_t rpc_call_setsockopt(int fd, int level, int optname, const void *optval, socklen_t optlen); -int32_t rpc_call_fcntl(int fd, int cmd, long val); -int32_t rpc_call_ioctl(int fd, long cmd, void *argp); -int32_t rpc_call_replenish(struct protocol_stack *stack, struct lwip_sock *sock); -int32_t rpc_call_mbufpoolsize(struct protocol_stack *stack); -int32_t rpc_call_rpcpool_size(struct protocol_stack *stack); -int32_t rpc_call_stack_exit(struct protocol_stack *stack); +struct rpc_stats *rpc_stats_get(void); +int32_t rpc_msgcnt(rpc_queue *queue); +int rpc_poll_msg(rpc_queue *queue, uint32_t max_num); +void rpc_call_clean_epoll(rpc_queue *queue, void *wakeup); +int32_t rpc_call_shadow_fd(rpc_queue *queue, int32_t fd, const struct sockaddr *addr, socklen_t addrlen); +int32_t rpc_call_recvlistcnt(rpc_queue *queue); +int32_t rpc_call_thread_regphase1(rpc_queue *queue, void *conn); +int32_t rpc_call_thread_regphase2(rpc_queue *queue, void *conn); +int32_t rpc_call_conntable(rpc_queue *queue, void *conn_table, uint32_t max_conn); +int32_t rpc_call_connnum(rpc_queue *queue); +int32_t rpc_call_arp(rpc_queue *queue, void *mbuf); +int32_t rpc_call_socket(rpc_queue *queue, int32_t domain, int32_t type, int32_t protocol); +int32_t rpc_call_close(rpc_queue *queue, int32_t fd); +int32_t rpc_call_shutdown(rpc_queue *queue, int fd, int how); +int32_t rpc_call_bind(rpc_queue *queue, int32_t fd, const struct sockaddr *addr, socklen_t addrlen); +int32_t rpc_call_listen(rpc_queue *queue, int s, int backlog); +int32_t rpc_call_accept(rpc_queue *queue, int fd, struct sockaddr *addr, socklen_t *addrlen, int flags); +int32_t rpc_call_connect(rpc_queue *queue, int fd, const struct sockaddr *addr, socklen_t addrlen); +int32_t rpc_call_send(rpc_queue *queue, int fd, const void *buf, size_t len, int flags); +int32_t rpc_call_getpeername(rpc_queue *queue, int fd, struct sockaddr *addr, socklen_t *addrlen); +int32_t rpc_call_getsockname(rpc_queue *queue, int fd, struct sockaddr *addr, socklen_t *addrlen); +int32_t rpc_call_getsockopt(rpc_queue *queue, int fd, int level, int optname, void *optval, socklen_t *optlen); +int32_t rpc_call_setsockopt(rpc_queue *queue, int fd, int level, int optname, const void *optval, socklen_t optlen); +int32_t rpc_call_fcntl(rpc_queue *queue, int fd, int cmd, long val); +int32_t rpc_call_ioctl(rpc_queue *queue, int fd, long cmd, void *argp); +int32_t rpc_call_replenish(rpc_queue *queue, void *sock); +int32_t rpc_call_mbufpoolsize(rpc_queue *queue); +int32_t rpc_call_stack_exit(rpc_queue *queue); -static inline __attribute__((always_inline)) void rpc_call(lockless_queue *queue, struct rpc_msg *msg) +static inline __attribute__((always_inline)) void rpc_call(rpc_queue *queue, struct rpc_msg *msg) { lockless_queue_mpsc_push(queue, &msg->queue_node); } diff --git a/src/lstack/netif/lstack_ethdev.c b/src/lstack/netif/lstack_ethdev.c index 4d6f620..965de58 100644 --- a/src/lstack/netif/lstack_ethdev.c +++ b/src/lstack/netif/lstack_ethdev.c @@ -529,7 +529,7 @@ void transfer_tcp_to_thread(struct rte_mbuf *mbuf, uint16_t stk_idx) struct protocol_stack *stack = get_protocol_stack_group()->stacks[stk_idx]; int ret = -1; while(ret != 0) { - ret = rpc_call_arp(stack, mbuf); + ret = rpc_call_arp(&stack->rpc_queue, mbuf); printf("transfer_tcp_to_thread, ret : %d \n", ret); } } @@ -550,10 +550,10 @@ void parse_arp_and_transefer(char* buf) } copy_mbuf(mbuf_copy, mbuf); - ret = rpc_call_arp(stack, mbuf_copy); + ret = rpc_call_arp(&stack->rpc_queue, mbuf_copy); while (ret != 0) { - rpc_call_arp(stack, mbuf_copy);; + rpc_call_arp(&stack->rpc_queue, mbuf_copy); } } } diff --git a/src/ltran/ltran_dfx.c b/src/ltran/ltran_dfx.c index 9f12096..bea0dc7 100644 --- a/src/ltran/ltran_dfx.c +++ b/src/ltran/ltran_dfx.c @@ -646,8 +646,7 @@ static void show_lstack_stats(struct gazelle_stack_dfx_data *lstack_stat) printf("call_alloc_fail: %-12"PRIu64" ", lstack_stat->data.pkts.call_alloc_fail); printf("call_null: %-18"PRIu64" \n", lstack_stat->data.pkts.stack_stat.call_null); printf("send_pkts_fail: %-13"PRIu64" ", lstack_stat->data.pkts.stack_stat.send_pkts_fail); - printf("mbuf_pool_freecnt: %-10"PRIu32" ", lstack_stat->data.pkts.mbufpool_avail_cnt); - printf("rpc_pool_freecnt: %-12"PRIu32" \n", lstack_stat->data.pkts.rpcpool_avail_cnt); + printf("mbuf_pool_freecnt: %-10"PRIu32" \n", lstack_stat->data.pkts.mbufpool_avail_cnt); printf("accpet_fail: %-16"PRIu64" ", lstack_stat->data.pkts.stack_stat.accept_fail); printf("sock_rx_drop: %-15"PRIu64" ", lstack_stat->data.pkts.stack_stat.sock_rx_drop); printf("sock_tx_merge: %-16"PRIu64" \n", lstack_stat->data.pkts.stack_stat.sock_tx_merge); -- 2.27.0
Locations
Projects
Search
Status Monitor
Help
Open Build Service
OBS Manuals
API Documentation
OBS Portal
Reporting a Bug
Contact
Mailing List
Forums
Chat (IRC)
Twitter
Open Build Service (OBS)
is an
openSUSE project
.