Projects
openEuler:22.03:LTS:SP3
gazelle
_service:tar_scm_kernel_repo:0137-split-the-flo...
Sign Up
Log In
Username
Password
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File _service:tar_scm_kernel_repo:0137-split-the-flow-fules-related-functions-into-separate.patch of Package gazelle
From 409b6155a1ec9324bd68aae97a07e33560c19028 Mon Sep 17 00:00:00 2001 From: jiangheng <jiangheng14@huawei.com> Date: Wed, 21 Feb 2024 08:05:03 +0800 Subject: [PATCH] split the flow fules related functions into separate file --- src/lstack/core/lstack_init.c | 1 + src/lstack/core/lstack_protocol_stack.c | 25 +- src/lstack/include/lstack_ethdev.h | 27 - src/lstack/include/lstack_flow.h | 51 ++ src/lstack/include/lstack_vdev.h | 9 - src/lstack/netif/dir.mk | 2 +- src/lstack/netif/lstack_ethdev.c | 687 +----------------------- src/lstack/netif/lstack_flow.c | 680 +++++++++++++++++++++++ src/lstack/netif/lstack_vdev.c | 1 + 9 files changed, 747 insertions(+), 736 deletions(-) create mode 100644 src/lstack/include/lstack_flow.h create mode 100644 src/lstack/netif/lstack_flow.c diff --git a/src/lstack/core/lstack_init.c b/src/lstack/core/lstack_init.c index 31fd91d..d22a295 100644 --- a/src/lstack/core/lstack_init.c +++ b/src/lstack/core/lstack_init.c @@ -48,6 +48,7 @@ #include "lstack_protocol_stack.h" #include "lstack_preload.h" #include "lstack_wrap.h" +#include "lstack_flow.h" static void check_process_start(void) { diff --git a/src/lstack/core/lstack_protocol_stack.c b/src/lstack/core/lstack_protocol_stack.c index 18e5df7..a545b73 100644 --- a/src/lstack/core/lstack_protocol_stack.c +++ b/src/lstack/core/lstack_protocol_stack.c @@ -454,14 +454,12 @@ int stack_polling(uint32_t wakeup_tick) { int force_quit; struct cfg_params *cfg = get_global_cfg_params(); - uint8_t use_ltran_flag = cfg->use_ltran; #if RTE_VERSION < RTE_VERSION_NUM(23, 11, 0, 0) bool kni_switch = cfg->kni_switch; #endif bool use_sockmap = cfg->use_sockmap; bool stack_mode_rtc = cfg->stack_mode_rtc; uint32_t rpc_number = cfg->rpc_number; - uint32_t nic_read_number = cfg->nic_read_number; uint32_t read_connect_number = cfg->read_connect_number; struct protocol_stack *stack = get_protocol_stack(); @@ -469,7 +467,7 @@ int stack_polling(uint32_t wakeup_tick) rpc_poll_msg(&stack->dfx_rpc_queue, 2); force_quit = rpc_poll_msg(&stack->rpc_queue, rpc_number); - gazelle_eth_dev_poll(stack, use_ltran_flag, nic_read_number); + eth_dev_poll(); sys_timer_run(); if (cfg->low_power_mod != 0) { low_power_idling(stack); @@ -525,10 +523,6 @@ static void* gazelle_stack_thread(void *arg) } sem_post(&g_stack_group.sem_stack_setup); - if (!use_ltran() && queue_id == 0) { - init_listen_and_user_ports(); - } - LSTACK_LOG(INFO, LSTACK, "stack_%02hu init success\n", queue_id); if (get_global_cfg_params()->stack_mode_rtc) { return NULL; @@ -545,12 +539,6 @@ static void* gazelle_stack_thread(void *arg) return NULL; } -static void gazelle_listen_thread(void *arg) -{ - struct cfg_params *cfg_param = get_global_cfg_params(); - recv_pkts_from_other_process(cfg_param->process_idx, arg); -} - int32_t stack_group_init_mempool(void) { struct cfg_params *global_cfg_parmas = get_global_cfg_params(); @@ -611,17 +599,6 @@ int32_t stack_group_init(void) } } - /* run to completion mode does not currently support multiple process */ - if (!use_ltran() && !get_global_cfg_params()->stack_mode_rtc) { - char name[PATH_MAX]; - sem_init(&stack_group->sem_listen_thread, 0, 0); - sprintf_s(name, sizeof(name), "%s", "listen_thread"); - struct sys_thread *thread = sys_thread_new(name, gazelle_listen_thread, - (void*)(&stack_group->sem_listen_thread), 0, 0); - free(thread); - sem_wait(&stack_group->sem_listen_thread); - } - return 0; } diff --git a/src/lstack/include/lstack_ethdev.h b/src/lstack/include/lstack_ethdev.h index 3252906..0c3d906 100644 --- a/src/lstack/include/lstack_ethdev.h +++ b/src/lstack/include/lstack_ethdev.h @@ -16,25 +16,6 @@ #include <rte_eal.h> #include <rte_version.h> -#define INVAILD_PROCESS_IDX 255 - -enum port_type { - PORT_LISTEN, - PORT_CONNECT, -}; - -enum PACKET_TRANSFER_TYPE { - TRANSFER_KERNEL = -1, - TRANSFER_OTHER_THREAD, - TRANSFER_CURRENT_THREAD, -}; - -enum TRANSFER_MESSAGE_RESULT { - CONNECT_ERROR = -2, - REPLY_ERROR = -1, - TRANSFER_SUCESS = 0, -}; - struct protocol_stack; struct rte_mbuf; struct lstack_dev_ops { @@ -44,21 +25,13 @@ struct lstack_dev_ops { int32_t ethdev_init(struct protocol_stack *stack); int32_t eth_dev_poll(void); -int32_t gazelle_eth_dev_poll(struct protocol_stack *stack, uint8_t use_ltran_flag, uint32_t nic_read_number); void eth_dev_recv(struct rte_mbuf *mbuf, struct protocol_stack *stack); -int recv_pkts_from_other_process(int process_index, void* arg); -int32_t check_params_from_primary(void); - #if RTE_VERSION < RTE_VERSION_NUM(23, 11, 0, 0) void kni_handle_rx(uint16_t port_id); void kni_handle_tx(struct rte_mbuf *mbuf); #endif -void delete_user_process_port(uint16_t dst_port, enum port_type type); -void add_user_process_port(uint16_t dst_port, uint8_t process_idx, enum port_type type); -void delete_flow_director(uint32_t dst_ip, uint16_t src_port, uint16_t dst_port); -void config_flow_director(uint16_t queue_id, uint32_t src_ip, uint32_t dst_ip, uint16_t src_port, uint16_t dst_port); void netif_poll(struct netif *netif); #endif /* __GAZELLE_ETHDEV_H__ */ diff --git a/src/lstack/include/lstack_flow.h b/src/lstack/include/lstack_flow.h new file mode 100644 index 0000000..ad35cdf --- /dev/null +++ b/src/lstack/include/lstack_flow.h @@ -0,0 +1,51 @@ +/* +* Copyright (c) Huawei Technologies Co., Ltd. 2020-2021. All rights reserved. +* gazelle is licensed under the Mulan PSL v2. +* You can use this software according to the terms and conditions of the Mulan PSL v2. +* You may obtain a copy of Mulan PSL v2 at: +* http://license.coscl.org.cn/MulanPSL2 +* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR +* IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR +* PURPOSE. +* See the Mulan PSL v2 for more details. +*/ + +#ifndef __LSTACK_FLOW_H__ +#define __LSTACK_FLOW_H__ + +#include <rte_mbuf.h> + +enum port_type { + PORT_LISTEN, + PORT_CONNECT, +}; + +enum PACKET_TRANSFER_TYPE { + TRANSFER_KERNEL = -1, + TRANSFER_OTHER_THREAD, + TRANSFER_CURRENT_THREAD, +}; + +enum TRANSFER_MESSAGE_RESULT { + CONNECT_ERROR = -2, + REPLY_ERROR = -1, + TRANSFER_SUCESS = 0, +}; + +int distribute_pakages(struct rte_mbuf *mbuf); +void flow_init(void); +int32_t check_params_from_primary(void); + +int recv_pkts_from_other_process(int process_index, void* arg); +void transfer_delete_rule_info_to_process0(uint32_t dst_ip, uint16_t src_port, uint16_t dst_port); +void transfer_create_rule_info_to_process0(uint16_t queue_id, uint32_t src_ip, + uint32_t dst_ip, uint16_t src_port, uint16_t dst_port); +void transfer_add_or_delete_listen_port_to_process0(uint16_t listen_port, uint8_t process_idx, uint8_t is_add); +void transfer_arp_to_other_process(struct rte_mbuf *mbuf); + +void add_user_process_port(uint16_t dst_port, uint8_t process_idx, enum port_type type); +void delete_user_process_port(uint16_t dst_port, enum port_type type); + +void gazelle_listen_thread(void *arg); + +#endif diff --git a/src/lstack/include/lstack_vdev.h b/src/lstack/include/lstack_vdev.h index 007eec7..4e5d191 100644 --- a/src/lstack/include/lstack_vdev.h +++ b/src/lstack/include/lstack_vdev.h @@ -13,19 +13,10 @@ #ifndef _GAZELLE_VDEV_H_ #define _GAZELLE_VDEV_H_ -#include <stdbool.h> - struct lstack_dev_ops; struct gazelle_quintuple; enum reg_ring_type; void vdev_dev_ops_init(struct lstack_dev_ops *dev_ops); int vdev_reg_xmit(enum reg_ring_type type, struct gazelle_quintuple *qtuple); -int recv_pkts_from_other_process(int process_index, void* arg); -void transfer_delete_rule_info_to_process0(uint32_t dst_ip, uint16_t src_port, uint16_t dst_port); -void transfer_create_rule_info_to_process0(uint16_t queue_id, uint32_t src_ip, - uint32_t dst_ip, uint16_t src_port, uint16_t dst_port); -void transfer_add_or_delete_listen_port_to_process0(uint16_t listen_port, uint8_t process_idx, uint8_t is_add); -void init_listen_and_user_ports(); - #endif /* _GAZELLE_VDEV_H_ */ diff --git a/src/lstack/netif/dir.mk b/src/lstack/netif/dir.mk index ec7c4ad..20fb5d6 100644 --- a/src/lstack/netif/dir.mk +++ b/src/lstack/netif/dir.mk @@ -8,5 +8,5 @@ # PURPOSE. # See the Mulan PSL v2 for more details. -SRC = lstack_ethdev.c lstack_vdev.c +SRC = lstack_ethdev.c lstack_vdev.c lstack_flow.c $(eval $(call register_dir, netif, $(SRC))) diff --git a/src/lstack/netif/lstack_ethdev.c b/src/lstack/netif/lstack_ethdev.c index 25c94eb..2e938b0 100644 --- a/src/lstack/netif/lstack_ethdev.c +++ b/src/lstack/netif/lstack_ethdev.c @@ -10,74 +10,36 @@ * See the Mulan PSL v2 for more details. */ -#include <sys/socket.h> -#include <sys/un.h> - #include <rte_eal.h> #include <rte_version.h> #if RTE_VERSION < RTE_VERSION_NUM(23, 11, 0, 0) #include <rte_kni.h> #endif #include <rte_ethdev.h> -#include <rte_malloc.h> -#include <rte_ether.h> -#include <lwip/debug.h> #include <lwip/etharp.h> #include <lwip/ethip6.h> #include <lwip/posix_api.h> #include <netif/ethernet.h> -#include <lwip/tcp.h> -#include <lwip/prot/tcp.h> #include <securec.h> -#include <rte_jhash.h> -#include <uthash.h> +#include "dpdk_common.h" #include "lstack_cfg.h" #include "lstack_vdev.h" #include "lstack_stack_stat.h" #include "lstack_log.h" #include "lstack_dpdk.h" #include "lstack_lwip.h" -#include "dpdk_common.h" #include "lstack_protocol_stack.h" #include "lstack_thread_rpc.h" +#include "lstack_flow.h" #include "lstack_ethdev.h" /* FRAME_MTU + 14byte header */ #define MBUF_MAX_LEN 1514 -#define MAX_PATTERN_NUM 4 -#define MAX_ACTION_NUM 2 -#define FULL_MASK 0xffffffff /* full mask */ -#define EMPTY_MASK 0x0 /* empty mask */ -#define LSTACK_MBUF_LEN 64 -#define TRANSFER_TCP_MUBF_LEN (LSTACK_MBUF_LEN + 3) -#define DELETE_FLOWS_PARAMS_NUM 3 -#define DELETE_FLOWS_PARAMS_LENGTH 30 -#define CREATE_FLOWS_PARAMS_NUM 6 -#define CREATE_FLOWS_PARAMS_LENGTH 60 -#define ADD_OR_DELETE_LISTEN_PORT_PARAMS_LENGTH 25 -#define ADD_OR_DELETE_LISTEN_PORT_PARAMS_NUM 3 -#define REPLY_LEN 10 -#define SUCCESS_REPLY "success" -#define ERROR_REPLY "error" #define PACKET_READ_SIZE 32 -#define GET_LSTACK_NUM 14 -#define GET_LSTACK_NUM_STRING "get_lstack_num" - -#define SERVER_PATH "/var/run/gazelle/server.socket" -#define SPLIT_DELIM "," - -#define UNIX_TCP_PORT_MAX 65535 - -#define IPV4_VERSION_OFFSET 4 -#define IPV4_VERSION 4 - -static uint8_t g_user_ports[UNIX_TCP_PORT_MAX] = {INVAILD_PROCESS_IDX, }; -static uint8_t g_listen_ports[UNIX_TCP_PORT_MAX] = {INVAILD_PROCESS_IDX, }; - void eth_dev_recv(struct rte_mbuf *mbuf, struct protocol_stack *stack) { int32_t ret; @@ -126,636 +88,6 @@ void eth_dev_recv(struct rte_mbuf *mbuf, struct protocol_stack *stack) } } -int32_t eth_dev_poll(void) -{ - uint32_t nr_pkts; - struct cfg_params *cfg = get_global_cfg_params(); - struct protocol_stack *stack = get_protocol_stack(); - - nr_pkts = stack->dev_ops.rx_poll(stack, stack->pkts, cfg->nic_read_number); - if (nr_pkts == 0) { - return 0; - } - - if (!cfg->use_ltran && get_protocol_stack_group()->latency_start) { - uint64_t time_stamp = get_current_time(); - time_stamp_into_mbuf(nr_pkts, stack->pkts, time_stamp); - } - - for (uint32_t i = 0; i < nr_pkts; i++) { - /* copy arp into other stack */ - if (!cfg->use_ltran) { - struct rte_ether_hdr *ethh = rte_pktmbuf_mtod(stack->pkts[i], struct rte_ether_hdr *); - if (unlikely(RTE_BE16(RTE_ETHER_TYPE_ARP) == ethh->ether_type)) { - stack_broadcast_arp(stack->pkts[i], stack); - } - } - - eth_dev_recv(stack->pkts[i], stack); - } - - stack->stats.rx += nr_pkts; - - return nr_pkts; -} - -/* flow rule map */ -#define RULE_KEY_LEN 23 -struct flow_rule { - char rule_key[RULE_KEY_LEN]; - struct rte_flow *flow; - UT_hash_handle hh; -}; - -static uint16_t g_flow_num = 0; -struct flow_rule *g_flow_rules = NULL; -struct flow_rule *find_rule(char *rule_key) -{ - struct flow_rule *fl; - HASH_FIND_STR(g_flow_rules, rule_key, fl); - return fl; -} - -void add_rule(char* rule_key, struct rte_flow *flow) -{ - struct flow_rule *rule; - HASH_FIND_STR(g_flow_rules, rule_key, rule); - if (rule == NULL) { - rule = (struct flow_rule*)malloc(sizeof(struct flow_rule)); - strcpy_s(rule->rule_key, RULE_KEY_LEN, rule_key); - HASH_ADD_STR(g_flow_rules, rule_key, rule); - } - rule->flow = flow; -} - -void delete_rule(char* rule_key) -{ - struct flow_rule *rule = NULL; - HASH_FIND_STR(g_flow_rules, rule_key, rule); - if (rule != NULL) { - HASH_DEL(g_flow_rules, rule); - free(rule); - } -} - -void init_listen_and_user_ports(void) -{ - memset_s(g_user_ports, sizeof(g_user_ports), INVAILD_PROCESS_IDX, sizeof(g_user_ports)); - memset_s(g_listen_ports, sizeof(g_listen_ports), INVAILD_PROCESS_IDX, sizeof(g_listen_ports)); -} - -int transfer_pkt_to_other_process(char *buf, int process_index, int write_len, bool need_reply) -{ - /* other process queue_id */ - struct sockaddr_un serun; - int sockfd; - int ret = 0; - - sockfd = posix_api->socket_fn(AF_UNIX, SOCK_STREAM, 0); - memset_s(&serun, sizeof(serun), 0, sizeof(serun)); - serun.sun_family = AF_UNIX; - sprintf_s(serun.sun_path, PATH_MAX, "%s%d", SERVER_PATH, process_index); - int32_t len = offsetof(struct sockaddr_un, sun_path) + strlen(serun.sun_path); - if (posix_api->connect_fn(sockfd, (struct sockaddr *)&serun, len) < 0) { - return CONNECT_ERROR; - } - posix_api->write_fn(sockfd, buf, write_len); - if (need_reply) { - char reply_message[REPLY_LEN]; - int32_t read_result = posix_api->read_fn(sockfd, reply_message, REPLY_LEN); - if (read_result > 0) { - if (strcmp(reply_message, SUCCESS_REPLY) == 0) { - ret = TRANSFER_SUCESS; - } else if (strcmp(reply_message, ERROR_REPLY) == 0) { - ret = REPLY_ERROR; - } else { - ret = atoi(reply_message); - } - } else { - ret = REPLY_ERROR; - } - } - posix_api->close_fn(sockfd); - - return ret; -} - -int32_t check_params_from_primary(void) -{ - struct cfg_params *cfg = get_global_cfg_params(); - if (cfg->is_primary) { - return 0; - } - // check lstack num - char get_lstack_num[GET_LSTACK_NUM]; - sprintf_s(get_lstack_num, GET_LSTACK_NUM, "%s", GET_LSTACK_NUM_STRING); - int32_t ret = transfer_pkt_to_other_process(get_lstack_num, 0, GET_LSTACK_NUM, true); - if (ret != cfg->num_cpu) { - return -1; - } - return 0; -} - -struct rte_flow *create_flow_director(uint16_t port_id, uint16_t queue_id, - uint32_t src_ip, uint32_t dst_ip, - uint16_t src_port, uint16_t dst_port, - struct rte_flow_error *error) -{ - struct rte_flow_attr attr; - struct rte_flow_item pattern[MAX_PATTERN_NUM]; - struct rte_flow_action action[MAX_ACTION_NUM]; - struct rte_flow *flow = NULL; - struct rte_flow_action_queue queue = { .index = queue_id }; - struct rte_flow_item_ipv4 ip_spec; - struct rte_flow_item_ipv4 ip_mask; - - struct rte_flow_item_tcp tcp_spec; - struct rte_flow_item_tcp tcp_mask; - int res; - - memset_s(pattern, sizeof(pattern), 0, sizeof(pattern)); - memset_s(action, sizeof(action), 0, sizeof(action)); - - /* - * set the rule attribute. - * in this case only ingress packets will be checked. - */ - memset_s(&attr, sizeof(struct rte_flow_attr), 0, sizeof(struct rte_flow_attr)); - attr.ingress = 1; - - /* - * create the action sequence. - * one action only, move packet to queue - */ - action[0].type = RTE_FLOW_ACTION_TYPE_QUEUE; - action[0].conf = &queue; - action[1].type = RTE_FLOW_ACTION_TYPE_END; - - // not limit eth header - pattern[0].type = RTE_FLOW_ITEM_TYPE_ETH; - - // ip header - memset_s(&ip_spec, sizeof(struct rte_flow_item_ipv4), 0, sizeof(struct rte_flow_item_ipv4)); - memset_s(&ip_mask, sizeof(struct rte_flow_item_ipv4), 0, sizeof(struct rte_flow_item_ipv4)); - ip_spec.hdr.dst_addr = dst_ip; - ip_mask.hdr.dst_addr = FULL_MASK; - ip_spec.hdr.src_addr = src_ip; - ip_mask.hdr.src_addr = FULL_MASK; - pattern[1].type = RTE_FLOW_ITEM_TYPE_IPV4; - pattern[1].spec = &ip_spec; - pattern[1].mask = &ip_mask; - - // tcp header, full mask 0xffff - memset_s(&tcp_spec, sizeof(struct rte_flow_item_tcp), 0, sizeof(struct rte_flow_item_tcp)); - memset_s(&tcp_mask, sizeof(struct rte_flow_item_tcp), 0, sizeof(struct rte_flow_item_tcp)); - pattern[2].type = RTE_FLOW_ITEM_TYPE_TCP; // 2: pattern 2 is tcp header - tcp_spec.hdr.src_port = src_port; - tcp_spec.hdr.dst_port = dst_port; - tcp_mask.hdr.src_port = rte_flow_item_tcp_mask.hdr.src_port; - tcp_mask.hdr.dst_port = rte_flow_item_tcp_mask.hdr.dst_port; - pattern[2].spec = &tcp_spec; - pattern[2].mask = &tcp_mask; - - /* the final level must be always type end */ - pattern[3].type = RTE_FLOW_ITEM_TYPE_END; - res = rte_flow_validate(port_id, &attr, pattern, action, error); - if (!res) { - flow = rte_flow_create(port_id, &attr, pattern, action, error); - } else { - LSTACK_LOG(ERR, LSTACK, "rte_flow_create.rte_flow_validate error, res %d \n", res); - } - - return flow; -} - -void config_flow_director(uint16_t queue_id, uint32_t src_ip, - uint32_t dst_ip, uint16_t src_port, uint16_t dst_port) -{ - uint16_t port_id = get_protocol_stack_group()->port_id; - char rule_key[RULE_KEY_LEN] = {0}; - sprintf_s(rule_key, sizeof(rule_key), "%u_%u_%u", src_ip, src_port, dst_port); - struct flow_rule *fl_exist = find_rule(rule_key); - if (fl_exist != NULL) { - return; - } - - LSTACK_LOG(INFO, LSTACK, - "config_flow_director, flow queue_id %u, src_ip %u,src_port_ntohs:%u, dst_port_ntohs:%u\n", - queue_id, src_ip, ntohs(src_port), ntohs(dst_port)); - - struct rte_flow_error error; - struct rte_flow *flow = create_flow_director(port_id, queue_id, src_ip, dst_ip, src_port, dst_port, &error); - if (!flow) { - LSTACK_LOG(ERR, LSTACK,"flow can not be created. queue_id %u, src_ip %u, src_port %u," - "dst_port %u, dst_port_ntohs :%u, type %d. message: %s\n", - queue_id, src_ip, src_port, dst_port, ntohs(dst_port), - error.type, error.message ? error.message : "(no stated reason)"); - return; - } - __sync_fetch_and_add(&g_flow_num, 1); - add_rule(rule_key, flow); -} - -void delete_flow_director(uint32_t dst_ip, uint16_t src_port, uint16_t dst_port) -{ - uint16_t port_id = get_protocol_stack_group()->port_id; - char rule_key[RULE_KEY_LEN] = {0}; - sprintf_s(rule_key, RULE_KEY_LEN, "%u_%u_%u",dst_ip, dst_port, src_port); - struct flow_rule *fl = find_rule(rule_key); - - if(fl != NULL){ - struct rte_flow_error error; - int ret = rte_flow_destroy(port_id, fl->flow, &error); - if(ret != 0){ - LSTACK_LOG(ERR, LSTACK, "Flow can't be delete %d message: %s\n", - error.type, error.message ? error.message : "(no stated reason)"); - } - delete_rule(rule_key); - __sync_fetch_and_sub(&g_flow_num, 1); - } -} - -/* if process 0, delete directly, else transfer 'dst_ip,src_port,dst_port' to process 0. */ -void transfer_delete_rule_info_to_process0(uint32_t dst_ip, uint16_t src_port, uint16_t dst_port) -{ - if (get_global_cfg_params()->is_primary) { - delete_flow_director(dst_ip, src_port, dst_port); - } else { - char process_server_path[DELETE_FLOWS_PARAMS_LENGTH]; - sprintf_s(process_server_path, DELETE_FLOWS_PARAMS_LENGTH, "%u%s%u%s%u", - dst_ip, SPLIT_DELIM, src_port, SPLIT_DELIM, dst_port); - int ret = transfer_pkt_to_other_process(process_server_path, 0, DELETE_FLOWS_PARAMS_LENGTH, false); - if(ret != TRANSFER_SUCESS){ - LSTACK_LOG(ERR, LSTACK, "error. tid %d. dst_ip %u, src_port: %u, dst_port %u\n", - rte_gettid(), dst_ip, src_port, dst_port); - } - } -} - -// if process 0, add directly, else transfer 'src_ip,dst_ip,src_port,dst_port,queue_id' to process 0. -void transfer_create_rule_info_to_process0(uint16_t queue_id, uint32_t src_ip, - uint32_t dst_ip, uint16_t src_port, - uint16_t dst_port) -{ - char process_server_path[CREATE_FLOWS_PARAMS_LENGTH]; - /* exchage src_ip and dst_ip, src_port and dst_port */ - uint8_t process_idx = get_global_cfg_params()->process_idx; - sprintf_s(process_server_path, CREATE_FLOWS_PARAMS_LENGTH, "%u%s%u%s%u%s%u%s%u%s%u", - dst_ip, SPLIT_DELIM, src_ip, SPLIT_DELIM, - dst_port, SPLIT_DELIM, src_port, SPLIT_DELIM, - queue_id, SPLIT_DELIM, process_idx); - int ret = transfer_pkt_to_other_process(process_server_path, 0, CREATE_FLOWS_PARAMS_LENGTH, true); - if (ret != TRANSFER_SUCESS) { - LSTACK_LOG(ERR, LSTACK, "error. tid %d. src_ip %u, dst_ip %u, src_port: %u, dst_port %u," - "queue_id %u, process_idx %u\n", - rte_gettid(), src_ip, dst_ip, src_port, dst_port, queue_id, process_idx); - } -} - -void transfer_add_or_delete_listen_port_to_process0(uint16_t listen_port, uint8_t process_idx, uint8_t is_add) -{ - char process_server_path[ADD_OR_DELETE_LISTEN_PORT_PARAMS_LENGTH]; - sprintf_s(process_server_path, ADD_OR_DELETE_LISTEN_PORT_PARAMS_LENGTH, - "%u%s%u%s%u", listen_port, SPLIT_DELIM, process_idx, SPLIT_DELIM, is_add); - int ret = transfer_pkt_to_other_process(process_server_path, 0, ADD_OR_DELETE_LISTEN_PORT_PARAMS_LENGTH, true); - if(ret != TRANSFER_SUCESS) { - LSTACK_LOG(ERR, LSTACK, "error. tid %d. listen_port %u, process_idx %u\n", - rte_gettid(), listen_port, process_idx); - } -} - -static int str_to_array(char *args, uint32_t *array, int size) -{ - int val; - uint16_t cnt = 0; - char *elem = NULL; - char *next_token = NULL; - - memset_s(array, sizeof(*array) * size, 0, sizeof(*array) * size); - elem = strtok_s((char *)args, SPLIT_DELIM, &next_token); - while (elem != NULL) { - if (cnt >= size) { - return -1; - } - val = atoi(elem); - if (val < 0) { - return -1; - } - array[cnt] = (uint32_t)val; - cnt++; - - elem = strtok_s(NULL, SPLIT_DELIM, &next_token); - } - - return cnt; -} - -void parse_and_delete_rule(char* buf) -{ - uint32_t array[DELETE_FLOWS_PARAMS_NUM]; - str_to_array(buf, array, DELETE_FLOWS_PARAMS_NUM); - uint32_t dst_ip = array[0]; - uint16_t src_port = array[1]; - uint16_t dst_port = array[2]; - delete_flow_director(dst_ip, src_port, dst_port); -} - -void add_user_process_port(uint16_t dst_port, uint8_t process_idx, enum port_type type) -{ - if (type == PORT_LISTEN) { - g_listen_ports[dst_port] = process_idx; - } else { - g_user_ports[dst_port] = process_idx; - } -} - -void delete_user_process_port(uint16_t dst_port, enum port_type type) -{ - if (type == PORT_LISTEN) { - g_listen_ports[dst_port] = INVAILD_PROCESS_IDX; - } else { - g_user_ports[dst_port] = INVAILD_PROCESS_IDX; - } -} - -void parse_and_create_rule(char* buf) -{ - uint32_t array[CREATE_FLOWS_PARAMS_NUM]; - str_to_array(buf, array, CREATE_FLOWS_PARAMS_NUM); - uint32_t src_ip = array[0]; - uint32_t dst_ip = array[1]; - uint16_t src_port = array[2]; - uint16_t dst_port = array[3]; - uint16_t queue_id = array[4]; - uint8_t process_idx = array[5]; - config_flow_director(queue_id, src_ip, dst_ip, src_port, dst_port); - add_user_process_port(dst_port, process_idx, PORT_CONNECT); -} - -void parse_and_add_or_delete_listen_port(char* buf) -{ - uint32_t array[ADD_OR_DELETE_LISTEN_PORT_PARAMS_NUM]; - str_to_array(buf, array, ADD_OR_DELETE_LISTEN_PORT_PARAMS_NUM); - uint16_t listen_port = array[0]; - uint8_t process_idx = array[1]; - uint8_t is_add = array[2]; - if (is_add == 1) { - add_user_process_port(listen_port, process_idx, PORT_LISTEN); - } else { - delete_user_process_port(listen_port, PORT_LISTEN); - } - -} - -void transfer_arp_to_other_process(struct rte_mbuf *mbuf) -{ - struct cfg_params *cfgs = get_global_cfg_params(); - - for(int i = 1; i < cfgs->num_process; i++){ - char arp_mbuf[LSTACK_MBUF_LEN] = {0}; - sprintf_s(arp_mbuf, sizeof(arp_mbuf), "%lu", mbuf); - int result = transfer_pkt_to_other_process(arp_mbuf, i, LSTACK_MBUF_LEN, false); - if (result == CONNECT_ERROR) { - LSTACK_LOG(INFO, LSTACK,"connect process %d failed, ensure the process is started.\n", i); - } else if (result == REPLY_ERROR) { - LSTACK_LOG(ERR, LSTACK,"transfer arp pakages to process %d error. %m\n", i); - } - } -} - -void transfer_tcp_to_thread(struct rte_mbuf *mbuf, uint16_t stk_idx) -{ - /* current process queue_id */ - struct protocol_stack *stack = get_protocol_stack_group()->stacks[stk_idx]; - int ret = -1; - while(ret != 0) { - ret = rpc_call_arp(&stack->rpc_queue, mbuf); - printf("transfer_tcp_to_thread, ret : %d \n", ret); - } -} - -void parse_arp_and_transefer(char* buf) -{ - struct rte_mbuf *mbuf = (struct rte_mbuf *)atoll(buf); - struct protocol_stack_group *stack_group = get_protocol_stack_group(); - struct rte_mbuf *mbuf_copy = NULL; - struct protocol_stack *stack = NULL; - int32_t ret; - for (int32_t i = 0; i < stack_group->stack_num; i++) { - stack = stack_group->stacks[i]; - ret = dpdk_alloc_pktmbuf(stack->rxtx_mbuf_pool, &mbuf_copy, 1, true); - while (ret != 0) { - ret = dpdk_alloc_pktmbuf(stack->rxtx_mbuf_pool, &mbuf_copy, 1, true); - stack->stats.rx_allocmbuf_fail++; - } - copy_mbuf(mbuf_copy, mbuf); - - ret = rpc_call_arp(&stack->rpc_queue, mbuf_copy); - - while (ret != 0) { - rpc_call_arp(&stack->rpc_queue, mbuf_copy); - } - } -} - -void parse_tcp_and_transefer(char* buf) -{ - char *next_token = NULL; - char *elem = strtok_s(buf, SPLIT_DELIM, &next_token); - struct rte_mbuf *mbuf = (struct rte_mbuf *) atoll(elem); - elem = strtok_s(NULL, SPLIT_DELIM, &next_token); - uint16_t queue_id = atoll(elem); - - struct protocol_stack_group *stack_group = get_protocol_stack_group(); - uint16_t num_queue = get_global_cfg_params()->num_queue; - uint16_t stk_index = queue_id % num_queue; - struct rte_mbuf *mbuf_copy = NULL; - struct protocol_stack *stack = stack_group->stacks[stk_index]; - - int32_t ret = dpdk_alloc_pktmbuf(stack->rxtx_mbuf_pool, &mbuf_copy, 1, true); - while (ret != 0) { - ret = dpdk_alloc_pktmbuf(stack->rxtx_mbuf_pool, &mbuf_copy, 1, true); - stack->stats.rx_allocmbuf_fail++; - } - - copy_mbuf(mbuf_copy,mbuf); - - transfer_tcp_to_thread(mbuf_copy, stk_index); -} - -int recv_pkts_from_other_process(int process_index, void* arg) -{ - struct sockaddr_un serun, cliun; - socklen_t cliun_len; - int listenfd, connfd, size; - char buf[132]; - /* socket */ - if ((listenfd = posix_api->socket_fn(AF_UNIX, SOCK_STREAM, 0)) < 0) { - perror("socket error"); - return -1; - } - /* bind */ - memset_s(&serun, sizeof(serun), 0, sizeof(serun)); - serun.sun_family = AF_UNIX; - char process_server_path[PATH_MAX]; - sprintf_s(process_server_path, sizeof(process_server_path), "%s%d", SERVER_PATH, process_index); - strcpy_s(serun.sun_path, sizeof(serun.sun_path), process_server_path); - size = offsetof(struct sockaddr_un, sun_path) + strlen(serun.sun_path); - unlink(process_server_path); - if (posix_api->bind_fn(listenfd, (struct sockaddr *)&serun, size) < 0) { - perror("bind error"); - return -1; - } - if (posix_api->listen_fn(listenfd, 20) < 0) { /* 20: max backlog */ - perror("listen error"); - return -1; - } - sem_post((sem_t *)arg); - /* block */ - while(1) { - cliun_len = sizeof(cliun); - if ((connfd = posix_api->accept_fn(listenfd, (struct sockaddr *)&cliun, &cliun_len)) < 0) { - perror("accept error"); - continue; - } - while(1) { - int n = posix_api->read_fn(connfd, buf, sizeof(buf)); - if (n < 0) { - perror("read error"); - break; - } else if (n == 0) { - break; - } - - if(n == LSTACK_MBUF_LEN) { - /* arp */ - parse_arp_and_transefer(buf); - } else if (n == TRANSFER_TCP_MUBF_LEN) { - /* tcp. lstack_mbuf_queue_id */ - parse_tcp_and_transefer(buf); - } else if (n == DELETE_FLOWS_PARAMS_LENGTH) { - /* delete rule */ - parse_and_delete_rule(buf); - } else if(n == CREATE_FLOWS_PARAMS_LENGTH) { - /* add rule */ - parse_and_create_rule(buf); - char reply_buf[REPLY_LEN]; - sprintf_s(reply_buf, sizeof(reply_buf), "%s", SUCCESS_REPLY); - posix_api->write_fn(connfd, reply_buf, REPLY_LEN); - } else if (n == GET_LSTACK_NUM) { - char reply_buf[REPLY_LEN]; - sprintf_s(reply_buf, sizeof(reply_buf), "%d", get_global_cfg_params()->num_cpu); - posix_api->write_fn(connfd, reply_buf, REPLY_LEN); - } else { - /* add port */ - parse_and_add_or_delete_listen_port(buf); - char reply_buf[REPLY_LEN]; - sprintf_s(reply_buf, sizeof(reply_buf), "%s", SUCCESS_REPLY); - posix_api->write_fn(connfd, reply_buf, REPLY_LEN); - } - - } - posix_api->close_fn(connfd); - } - posix_api->close_fn(listenfd); - return 0; -} - -void concat_mbuf_and_queue_id(struct rte_mbuf *mbuf, uint16_t queue_id, - char* mbuf_and_queue_id, int write_len) -{ - sprintf_s(mbuf_and_queue_id, write_len, "%lu%s%u", mbuf, SPLIT_DELIM, queue_id); -} - -static int mbuf_to_idx(struct rte_mbuf *mbuf, uint16_t *dst_port) -{ - struct rte_ether_hdr *ethh = rte_pktmbuf_mtod(mbuf, struct rte_ether_hdr *); - u16_t type = rte_be_to_cpu_16(ethh->ether_type); - uint32_t index = 0; - if (type == RTE_ETHER_TYPE_IPV4) { - struct rte_ipv4_hdr *iph = rte_pktmbuf_mtod_offset(mbuf, struct rte_ipv4_hdr *, sizeof(struct rte_ether_hdr)); - uint8_t ip_version = (iph->version_ihl & 0xf0) >> IPV4_VERSION_OFFSET; - if (likely(ip_version == IPV4_VERSION)) { - if (likely(iph->next_proto_id == IPPROTO_TCP)) { - struct rte_tcp_hdr *tcp_hdr = rte_pktmbuf_mtod_offset(mbuf, struct rte_tcp_hdr *, - sizeof(struct rte_ether_hdr) + sizeof(struct rte_ipv4_hdr)); - *dst_port = tcp_hdr->dst_port; - - if (unlikely(tcp_hdr->tcp_flags == TCP_SYN)) { - uint32_t src_ip = iph->src_addr; - uint16_t src_port = tcp_hdr->src_port; - index = rte_jhash_3words(src_ip, src_port | ((*dst_port) << 16), 0, 0); - } else { - return -1; - } - } - } - } else if (type == RTE_ETHER_TYPE_IPV6) { - struct rte_ipv6_hdr *iph = rte_pktmbuf_mtod_offset(mbuf, struct rte_ipv6_hdr *, sizeof(struct rte_ether_hdr)); - if (likely(iph->proto == IPPROTO_TCP)) { - struct rte_tcp_hdr *tcp_hdr = rte_pktmbuf_mtod_offset(mbuf, struct rte_tcp_hdr *, - sizeof(struct rte_ether_hdr) + sizeof(struct rte_ipv6_hdr)); - *dst_port = tcp_hdr->dst_port; - - if (unlikely(tcp_hdr->tcp_flags == TCP_SYN)) { - uint32_t *src_ip = (uint32_t *) &iph->src_addr; - uint16_t src_port = tcp_hdr->src_port; - uint32_t v = rte_jhash_3words(src_ip[0], src_ip[1], src_ip[2], 0); - index = rte_jhash_3words(src_ip[3], src_port | ((*dst_port) << 16), v, 0); - } else { - return -1; - } - } - } else { - return -1; - } - return index; -} - -int distribute_pakages(struct rte_mbuf *mbuf) -{ - uint16_t dst_port = 0; - uint32_t index = mbuf_to_idx(mbuf, &dst_port); - if (index == -1) { - return TRANSFER_CURRENT_THREAD; - } - - uint16_t queue_id = 0; - uint32_t user_process_idx = 0; - int each_process_queue_num = get_global_cfg_params()->num_queue; - index = index % each_process_queue_num; - if (g_listen_ports[dst_port] != INVAILD_PROCESS_IDX) { - user_process_idx = g_listen_ports[dst_port]; - } else { - user_process_idx = g_user_ports[dst_port]; - } - - if (user_process_idx == INVAILD_PROCESS_IDX) { - return TRANSFER_KERNEL; - } - - if (get_global_cfg_params()->seperate_send_recv) { - queue_id = user_process_idx * each_process_queue_num + (index / 2) * 2; - } else { - queue_id = user_process_idx * each_process_queue_num + index; - } - if (queue_id != 0) { - if (user_process_idx == 0) { - transfer_tcp_to_thread(mbuf, queue_id); - } else { - char mbuf_and_queue_id[TRANSFER_TCP_MUBF_LEN]; - concat_mbuf_and_queue_id(mbuf, queue_id, mbuf_and_queue_id, TRANSFER_TCP_MUBF_LEN); - transfer_pkt_to_other_process(mbuf_and_queue_id, user_process_idx, - TRANSFER_TCP_MUBF_LEN, false); - } - return TRANSFER_OTHER_THREAD; - } else { - return TRANSFER_CURRENT_THREAD; - } - - return TRANSFER_KERNEL; -} - #if RTE_VERSION < RTE_VERSION_NUM(23, 11, 0, 0) void kni_handle_rx(uint16_t port_id) { @@ -797,17 +129,18 @@ void kni_handle_tx(struct rte_mbuf *mbuf) } #endif -/* optimized eth_dev_poll() in lstack */ -int32_t gazelle_eth_dev_poll(struct protocol_stack *stack, uint8_t use_ltran_flag, uint32_t nic_read_number) +int32_t eth_dev_poll(void) { uint32_t nr_pkts; + struct cfg_params *cfg = get_global_cfg_params(); + struct protocol_stack *stack = get_protocol_stack(); - nr_pkts = stack->dev_ops.rx_poll(stack, stack->pkts, nic_read_number); + nr_pkts = stack->dev_ops.rx_poll(stack, stack->pkts, cfg->nic_read_number); if (nr_pkts == 0) { return 0; } - if (!use_ltran_flag && get_protocol_stack_group()->latency_start) { + if (!use_ltran() && get_protocol_stack_group()->latency_start) { uint64_t time_stamp = get_current_time(); time_stamp_into_mbuf(nr_pkts, stack->pkts, time_stamp); } @@ -816,7 +149,7 @@ int32_t gazelle_eth_dev_poll(struct protocol_stack *stack, uint8_t use_ltran_fla /* 1 current thread recv; 0 other thread recv; -1 kni recv; */ int transfer_type = TRANSFER_CURRENT_THREAD; /* copy arp into other stack */ - if (!use_ltran_flag) { + if (!use_ltran()) { struct rte_ether_hdr *ethh = rte_pktmbuf_mtod(stack->pkts[i], struct rte_ether_hdr *); u16_t type; type = ethh->ether_type; @@ -946,6 +279,10 @@ int32_t ethdev_init(struct protocol_stack *stack) LSTACK_LOG(ERR, LSTACK, "fill mbuf to rx_ring failed ret=%d\n", ret); return ret; } + } else { + if (cfg->tuple_filter && stack->queue_id == 0) { + flow_init(); + } } netif_set_default(&stack->netif); diff --git a/src/lstack/netif/lstack_flow.c b/src/lstack/netif/lstack_flow.c new file mode 100644 index 0000000..4e04209 --- /dev/null +++ b/src/lstack/netif/lstack_flow.c @@ -0,0 +1,680 @@ +/* +* Copyright (c) Huawei Technologies Co., Ltd. 2020-2021. All rights reserved. +* gazelle is licensed under the Mulan PSL v2. +* You can use this software according to the terms and conditions of the Mulan PSL v2. +* You may obtain a copy of Mulan PSL v2 at: +* http://license.coscl.org.cn/MulanPSL2 +* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR +* IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR +* PURPOSE. +* See the Mulan PSL v2 for more details. +*/ +#include <sys/socket.h> +#include <sys/un.h> +#include <securec.h> + +#include <rte_mbuf.h> +#include <rte_flow.h> +#include <rte_jhash.h> +#include <uthash.h> + +#include <lwip/posix_api.h> +#include <lwip/tcp.h> +#include <lwip/prot/tcp.h> + +#include "dpdk_common.h" +#include "lstack_log.h" +#include "lstack_dpdk.h" +#include "lstack_cfg.h" +#include "lstack_protocol_stack.h" +#include "lstack_flow.h" + +#define MAX_PATTERN_NUM 4 +#define MAX_ACTION_NUM 2 +#define FULL_MASK 0xffffffff /* full mask */ +#define EMPTY_MASK 0x0 /* empty mask */ +#define LSTACK_MBUF_LEN 64 +#define TRANSFER_TCP_MUBF_LEN (LSTACK_MBUF_LEN + 3) +#define DELETE_FLOWS_PARAMS_NUM 3 +#define DELETE_FLOWS_PARAMS_LENGTH 30 +#define CREATE_FLOWS_PARAMS_NUM 6 +#define CREATE_FLOWS_PARAMS_LENGTH 60 +#define ADD_OR_DELETE_LISTEN_PORT_PARAMS_LENGTH 25 +#define ADD_OR_DELETE_LISTEN_PORT_PARAMS_NUM 3 +#define REPLY_LEN 10 +#define SUCCESS_REPLY "success" +#define ERROR_REPLY "error" + +#define GET_LSTACK_NUM 14 +#define GET_LSTACK_NUM_STRING "get_lstack_num" + +#define SERVER_PATH "/var/run/gazelle/server.socket" +#define SPLIT_DELIM "," + +#define UNIX_TCP_PORT_MAX 65535 + +#define INVAILD_PROCESS_IDX 255 + +#define IPV4_VERSION_OFFSET 4 +#define IPV4_VERSION 4 + +static uint8_t g_user_ports[UNIX_TCP_PORT_MAX] = {INVAILD_PROCESS_IDX, }; +static uint8_t g_listen_ports[UNIX_TCP_PORT_MAX] = {INVAILD_PROCESS_IDX, }; + +/* flow rule map */ +#define RULE_KEY_LEN 23 +struct flow_rule { + char rule_key[RULE_KEY_LEN]; + struct rte_flow *flow; + UT_hash_handle hh; +}; + +static uint16_t g_flow_num = 0; +static struct flow_rule *g_flow_rules = NULL; +static struct flow_rule *find_rule(char *rule_key) +{ + struct flow_rule *fl; + HASH_FIND_STR(g_flow_rules, rule_key, fl); + return fl; +} + +static void add_rule(char* rule_key, struct rte_flow *flow) +{ + struct flow_rule *rule; + HASH_FIND_STR(g_flow_rules, rule_key, rule); + if (rule == NULL) { + rule = (struct flow_rule*)malloc(sizeof(struct flow_rule)); + strcpy_s(rule->rule_key, RULE_KEY_LEN, rule_key); + HASH_ADD_STR(g_flow_rules, rule_key, rule); + } + rule->flow = flow; +} + +static void delete_rule(char* rule_key) +{ + struct flow_rule *rule = NULL; + HASH_FIND_STR(g_flow_rules, rule_key, rule); + if (rule != NULL) { + HASH_DEL(g_flow_rules, rule); + free(rule); + } +} + +static void init_listen_and_user_ports(void) +{ + memset_s(g_user_ports, sizeof(g_user_ports), INVAILD_PROCESS_IDX, sizeof(g_user_ports)); + memset_s(g_listen_ports, sizeof(g_listen_ports), INVAILD_PROCESS_IDX, sizeof(g_listen_ports)); +} + +static int transfer_pkt_to_other_process(char *buf, int process_index, int write_len, bool need_reply) +{ + /* other process queue_id */ + struct sockaddr_un serun; + int sockfd; + int ret = 0; + + sockfd = posix_api->socket_fn(AF_UNIX, SOCK_STREAM, 0); + memset_s(&serun, sizeof(serun), 0, sizeof(serun)); + serun.sun_family = AF_UNIX; + sprintf_s(serun.sun_path, PATH_MAX, "%s%d", SERVER_PATH, process_index); + int32_t len = offsetof(struct sockaddr_un, sun_path) + strlen(serun.sun_path); + if (posix_api->connect_fn(sockfd, (struct sockaddr *)&serun, len) < 0) { + return CONNECT_ERROR; + } + posix_api->write_fn(sockfd, buf, write_len); + if (need_reply) { + char reply_message[REPLY_LEN]; + int32_t read_result = posix_api->read_fn(sockfd, reply_message, REPLY_LEN); + if (read_result > 0) { + if (strcmp(reply_message, SUCCESS_REPLY) == 0) { + ret = TRANSFER_SUCESS; + } else if (strcmp(reply_message, ERROR_REPLY) == 0) { + ret = REPLY_ERROR; + } else { + ret = atoi(reply_message); + } + } else { + ret = REPLY_ERROR; + } + } + posix_api->close_fn(sockfd); + + return ret; +} + +int32_t check_params_from_primary(void) +{ + struct cfg_params *cfg = get_global_cfg_params(); + if (cfg->is_primary) { + return 0; + } + // check lstack num + char get_lstack_num[GET_LSTACK_NUM]; + sprintf_s(get_lstack_num, GET_LSTACK_NUM, "%s", GET_LSTACK_NUM_STRING); + int32_t ret = transfer_pkt_to_other_process(get_lstack_num, 0, GET_LSTACK_NUM, true); + if (ret != cfg->num_cpu) { + return -1; + } + return 0; +} + +static struct rte_flow *create_flow_director(uint16_t port_id, uint16_t queue_id, + uint32_t src_ip, uint32_t dst_ip, + uint16_t src_port, uint16_t dst_port, + struct rte_flow_error *error) +{ + struct rte_flow_attr attr; + struct rte_flow_item pattern[MAX_PATTERN_NUM]; + struct rte_flow_action action[MAX_ACTION_NUM]; + struct rte_flow *flow = NULL; + struct rte_flow_action_queue queue = { .index = queue_id }; + struct rte_flow_item_ipv4 ip_spec; + struct rte_flow_item_ipv4 ip_mask; + + struct rte_flow_item_tcp tcp_spec; + struct rte_flow_item_tcp tcp_mask; + int res; + + memset_s(pattern, sizeof(pattern), 0, sizeof(pattern)); + memset_s(action, sizeof(action), 0, sizeof(action)); + + /* + * set the rule attribute. + * in this case only ingress packets will be checked. + */ + memset_s(&attr, sizeof(struct rte_flow_attr), 0, sizeof(struct rte_flow_attr)); + attr.ingress = 1; + + /* + * create the action sequence. + * one action only, move packet to queue + */ + action[0].type = RTE_FLOW_ACTION_TYPE_QUEUE; + action[0].conf = &queue; + action[1].type = RTE_FLOW_ACTION_TYPE_END; + + // not limit eth header + pattern[0].type = RTE_FLOW_ITEM_TYPE_ETH; + + // ip header + memset_s(&ip_spec, sizeof(struct rte_flow_item_ipv4), 0, sizeof(struct rte_flow_item_ipv4)); + memset_s(&ip_mask, sizeof(struct rte_flow_item_ipv4), 0, sizeof(struct rte_flow_item_ipv4)); + ip_spec.hdr.dst_addr = dst_ip; + ip_mask.hdr.dst_addr = FULL_MASK; + ip_spec.hdr.src_addr = src_ip; + ip_mask.hdr.src_addr = FULL_MASK; + pattern[1].type = RTE_FLOW_ITEM_TYPE_IPV4; + pattern[1].spec = &ip_spec; + pattern[1].mask = &ip_mask; + + // tcp header, full mask 0xffff + memset_s(&tcp_spec, sizeof(struct rte_flow_item_tcp), 0, sizeof(struct rte_flow_item_tcp)); + memset_s(&tcp_mask, sizeof(struct rte_flow_item_tcp), 0, sizeof(struct rte_flow_item_tcp)); + pattern[2].type = RTE_FLOW_ITEM_TYPE_TCP; // 2: pattern 2 is tcp header + tcp_spec.hdr.src_port = src_port; + tcp_spec.hdr.dst_port = dst_port; + tcp_mask.hdr.src_port = rte_flow_item_tcp_mask.hdr.src_port; + tcp_mask.hdr.dst_port = rte_flow_item_tcp_mask.hdr.dst_port; + pattern[2].spec = &tcp_spec; + pattern[2].mask = &tcp_mask; + + /* the final level must be always type end */ + pattern[3].type = RTE_FLOW_ITEM_TYPE_END; + res = rte_flow_validate(port_id, &attr, pattern, action, error); + if (!res) { + flow = rte_flow_create(port_id, &attr, pattern, action, error); + } else { + LSTACK_LOG(ERR, LSTACK, "rte_flow_create.rte_flow_validate error, res %d \n", res); + } + + return flow; +} + +static void config_flow_director(uint16_t queue_id, uint32_t src_ip, + uint32_t dst_ip, uint16_t src_port, uint16_t dst_port) +{ + uint16_t port_id = get_protocol_stack_group()->port_id; + char rule_key[RULE_KEY_LEN] = {0}; + sprintf_s(rule_key, sizeof(rule_key), "%u_%u_%u", src_ip, src_port, dst_port); + struct flow_rule *fl_exist = find_rule(rule_key); + if (fl_exist != NULL) { + return; + } + + LSTACK_LOG(INFO, LSTACK, + "config_flow_director, flow queue_id %u, src_ip %u,src_port_ntohs:%u, dst_port_ntohs:%u\n", + queue_id, src_ip, ntohs(src_port), ntohs(dst_port)); + + struct rte_flow_error error; + struct rte_flow *flow = create_flow_director(port_id, queue_id, src_ip, dst_ip, src_port, dst_port, &error); + if (!flow) { + LSTACK_LOG(ERR, LSTACK,"flow can not be created. queue_id %u, src_ip %u, src_port %u," + "dst_port %u, dst_port_ntohs :%u, type %d. message: %s\n", + queue_id, src_ip, src_port, dst_port, ntohs(dst_port), + error.type, error.message ? error.message : "(no stated reason)"); + return; + } + __sync_fetch_and_add(&g_flow_num, 1); + add_rule(rule_key, flow); +} + +static void delete_flow_director(uint32_t dst_ip, uint16_t src_port, uint16_t dst_port) +{ + uint16_t port_id = get_protocol_stack_group()->port_id; + char rule_key[RULE_KEY_LEN] = {0}; + sprintf_s(rule_key, RULE_KEY_LEN, "%u_%u_%u",dst_ip, dst_port, src_port); + struct flow_rule *fl = find_rule(rule_key); + + if(fl != NULL) { + struct rte_flow_error error; + int ret = rte_flow_destroy(port_id, fl->flow, &error); + if (ret != 0) { + LSTACK_LOG(ERR, LSTACK, "Flow can't be delete %d message: %s\n", + error.type, error.message ? error.message : "(no stated reason)"); + } + delete_rule(rule_key); + __sync_fetch_and_sub(&g_flow_num, 1); + } +} + +/* if process 0, delete directly, else transfer 'dst_ip,src_port,dst_port' to process 0. */ +void transfer_delete_rule_info_to_process0(uint32_t dst_ip, uint16_t src_port, uint16_t dst_port) +{ + if (get_global_cfg_params()->is_primary) { + delete_flow_director(dst_ip, src_port, dst_port); + } else { + char process_server_path[DELETE_FLOWS_PARAMS_LENGTH]; + sprintf_s(process_server_path, DELETE_FLOWS_PARAMS_LENGTH, "%u%s%u%s%u", + dst_ip, SPLIT_DELIM, src_port, SPLIT_DELIM, dst_port); + int ret = transfer_pkt_to_other_process(process_server_path, 0, DELETE_FLOWS_PARAMS_LENGTH, false); + if (ret != TRANSFER_SUCESS) { + LSTACK_LOG(ERR, LSTACK, "error. tid %d. dst_ip %u, src_port: %u, dst_port %u\n", + rte_gettid(), dst_ip, src_port, dst_port); + } + } +} + +// if process 0, add directly, else transfer 'src_ip,dst_ip,src_port,dst_port,queue_id' to process 0. +void transfer_create_rule_info_to_process0(uint16_t queue_id, uint32_t src_ip, + uint32_t dst_ip, uint16_t src_port, + uint16_t dst_port) +{ + char process_server_path[CREATE_FLOWS_PARAMS_LENGTH]; + /* exchage src_ip and dst_ip, src_port and dst_port */ + uint8_t process_idx = get_global_cfg_params()->process_idx; + sprintf_s(process_server_path, CREATE_FLOWS_PARAMS_LENGTH, "%u%s%u%s%u%s%u%s%u%s%u", + dst_ip, SPLIT_DELIM, src_ip, SPLIT_DELIM, + dst_port, SPLIT_DELIM, src_port, SPLIT_DELIM, + queue_id, SPLIT_DELIM, process_idx); + int ret = transfer_pkt_to_other_process(process_server_path, 0, CREATE_FLOWS_PARAMS_LENGTH, true); + if (ret != TRANSFER_SUCESS) { + LSTACK_LOG(ERR, LSTACK, "error. tid %d. src_ip %u, dst_ip %u, src_port: %u, dst_port %u," + "queue_id %u, process_idx %u\n", + rte_gettid(), src_ip, dst_ip, src_port, dst_port, queue_id, process_idx); + } +} + +void transfer_add_or_delete_listen_port_to_process0(uint16_t listen_port, uint8_t process_idx, uint8_t is_add) +{ + char process_server_path[ADD_OR_DELETE_LISTEN_PORT_PARAMS_LENGTH]; + sprintf_s(process_server_path, ADD_OR_DELETE_LISTEN_PORT_PARAMS_LENGTH, + "%u%s%u%s%u", listen_port, SPLIT_DELIM, process_idx, SPLIT_DELIM, is_add); + int ret = transfer_pkt_to_other_process(process_server_path, 0, ADD_OR_DELETE_LISTEN_PORT_PARAMS_LENGTH, true); + if (ret != TRANSFER_SUCESS) { + LSTACK_LOG(ERR, LSTACK, "error. tid %d. listen_port %u, process_idx %u\n", + rte_gettid(), listen_port, process_idx); + } +} + +static int str_to_array(char *args, uint32_t *array, int size) +{ + int val; + uint16_t cnt = 0; + char *elem = NULL; + char *next_token = NULL; + + memset_s(array, sizeof(*array) * size, 0, sizeof(*array) * size); + elem = strtok_s((char *)args, SPLIT_DELIM, &next_token); + while (elem != NULL) { + if (cnt >= size) { + return -1; + } + val = atoi(elem); + if (val < 0) { + return -1; + } + array[cnt] = (uint32_t)val; + cnt++; + + elem = strtok_s(NULL, SPLIT_DELIM, &next_token); + } + + return cnt; +} + +static void parse_and_delete_rule(char* buf) +{ + uint32_t array[DELETE_FLOWS_PARAMS_NUM]; + str_to_array(buf, array, DELETE_FLOWS_PARAMS_NUM); + uint32_t dst_ip = array[0]; + uint16_t src_port = array[1]; + uint16_t dst_port = array[2]; + delete_flow_director(dst_ip, src_port, dst_port); +} + +void add_user_process_port(uint16_t dst_port, uint8_t process_idx, enum port_type type) +{ + if (type == PORT_LISTEN) { + g_listen_ports[dst_port] = process_idx; + } else { + g_user_ports[dst_port] = process_idx; + } +} + +void delete_user_process_port(uint16_t dst_port, enum port_type type) +{ + if (type == PORT_LISTEN) { + g_listen_ports[dst_port] = INVAILD_PROCESS_IDX; + } else { + g_user_ports[dst_port] = INVAILD_PROCESS_IDX; + } +} + +static void parse_and_create_rule(char* buf) +{ + uint32_t array[CREATE_FLOWS_PARAMS_NUM]; + str_to_array(buf, array, CREATE_FLOWS_PARAMS_NUM); + uint32_t src_ip = array[0]; + uint32_t dst_ip = array[1]; + uint16_t src_port = array[2]; + uint16_t dst_port = array[3]; + uint16_t queue_id = array[4]; + uint8_t process_idx = array[5]; + config_flow_director(queue_id, src_ip, dst_ip, src_port, dst_port); + add_user_process_port(dst_port, process_idx, PORT_CONNECT); +} + +static void parse_and_add_or_delete_listen_port(char* buf) +{ + uint32_t array[ADD_OR_DELETE_LISTEN_PORT_PARAMS_NUM]; + str_to_array(buf, array, ADD_OR_DELETE_LISTEN_PORT_PARAMS_NUM); + uint16_t listen_port = array[0]; + uint8_t process_idx = array[1]; + uint8_t is_add = array[2]; + if (is_add == 1) { + add_user_process_port(listen_port, process_idx, PORT_LISTEN); + } else { + delete_user_process_port(listen_port, PORT_LISTEN); + } +} + +void transfer_arp_to_other_process(struct rte_mbuf *mbuf) +{ + struct cfg_params *cfgs = get_global_cfg_params(); + + for (int i = 1; i < cfgs->num_process; i++) { + char arp_mbuf[LSTACK_MBUF_LEN] = {0}; + sprintf_s(arp_mbuf, sizeof(arp_mbuf), "%lu", mbuf); + int result = transfer_pkt_to_other_process(arp_mbuf, i, LSTACK_MBUF_LEN, false); + if (result == CONNECT_ERROR) { + LSTACK_LOG(INFO, LSTACK,"connect process %d failed, ensure the process is started.\n", i); + } else if (result == REPLY_ERROR) { + LSTACK_LOG(ERR, LSTACK,"transfer arp pakages to process %d error. %m\n", i); + } + } +} + +static void transfer_tcp_to_thread(struct rte_mbuf *mbuf, uint16_t stk_idx) +{ + /* current process queue_id */ + struct protocol_stack *stack = get_protocol_stack_group()->stacks[stk_idx]; + int ret = -1; + while (ret != 0) { + ret = rpc_call_arp(&stack->rpc_queue, mbuf); + printf("transfer_tcp_to_thread, ret : %d \n", ret); + } +} + +static void parse_arp_and_transefer(char* buf) +{ + struct rte_mbuf *mbuf = (struct rte_mbuf *)atoll(buf); + struct protocol_stack_group *stack_group = get_protocol_stack_group(); + struct rte_mbuf *mbuf_copy = NULL; + struct protocol_stack *stack = NULL; + int32_t ret; + for (int32_t i = 0; i < stack_group->stack_num; i++) { + stack = stack_group->stacks[i]; + ret = dpdk_alloc_pktmbuf(stack->rxtx_mbuf_pool, &mbuf_copy, 1, false); + while (ret != 0) { + ret = dpdk_alloc_pktmbuf(stack->rxtx_mbuf_pool, &mbuf_copy, 1, false); + stack->stats.rx_allocmbuf_fail++; + } + copy_mbuf(mbuf_copy, mbuf); + + ret = rpc_call_arp(&stack->rpc_queue, mbuf_copy); + + while (ret != 0) { + rpc_call_arp(&stack->rpc_queue, mbuf_copy); + } + } +} + +static void parse_tcp_and_transefer(char* buf) +{ + char *next_token = NULL; + char *elem = strtok_s(buf, SPLIT_DELIM, &next_token); + struct rte_mbuf *mbuf = (struct rte_mbuf *) atoll(elem); + elem = strtok_s(NULL, SPLIT_DELIM, &next_token); + uint16_t queue_id = atoll(elem); + + struct protocol_stack_group *stack_group = get_protocol_stack_group(); + uint16_t num_queue = get_global_cfg_params()->num_queue; + uint16_t stk_index = queue_id % num_queue; + struct rte_mbuf *mbuf_copy = NULL; + struct protocol_stack *stack = stack_group->stacks[stk_index]; + + int32_t ret = dpdk_alloc_pktmbuf(stack->rxtx_mbuf_pool, &mbuf_copy, 1, false); + while (ret != 0) { + ret = dpdk_alloc_pktmbuf(stack->rxtx_mbuf_pool, &mbuf_copy, 1, false); + stack->stats.rx_allocmbuf_fail++; + } + + copy_mbuf(mbuf_copy,mbuf); + transfer_tcp_to_thread(mbuf_copy, stk_index); +} + +int recv_pkts_from_other_process(int process_index, void* arg) +{ + struct sockaddr_un serun, cliun; + socklen_t cliun_len; + int listenfd, connfd, size; + char buf[132]; + /* socket */ + if ((listenfd = posix_api->socket_fn(AF_UNIX, SOCK_STREAM, 0)) < 0) { + perror("socket error"); + return -1; + } + /* bind */ + memset_s(&serun, sizeof(serun), 0, sizeof(serun)); + serun.sun_family = AF_UNIX; + char process_server_path[PATH_MAX]; + sprintf_s(process_server_path, sizeof(process_server_path), "%s%d", SERVER_PATH, process_index); + strcpy_s(serun.sun_path, sizeof(serun.sun_path), process_server_path); + size = offsetof(struct sockaddr_un, sun_path) + strlen(serun.sun_path); + unlink(process_server_path); + if (posix_api->bind_fn(listenfd, (struct sockaddr *)&serun, size) < 0) { + perror("bind error"); + return -1; + } + if (posix_api->listen_fn(listenfd, 20) < 0) { /* 20: max backlog */ + perror("listen error"); + return -1; + } + sem_post((sem_t *)arg); + /* block */ + while (1) { + cliun_len = sizeof(cliun); + if ((connfd = posix_api->accept_fn(listenfd, (struct sockaddr *)&cliun, &cliun_len)) < 0) { + perror("accept error"); + continue; + } + while (1) { + int n = posix_api->read_fn(connfd, buf, sizeof(buf)); + if (n < 0) { + perror("read error"); + break; + } else if (n == 0) { + break; + } + + if (n == LSTACK_MBUF_LEN) { + /* arp */ + parse_arp_and_transefer(buf); + } else if (n == TRANSFER_TCP_MUBF_LEN) { + /* tcp. lstack_mbuf_queue_id */ + parse_tcp_and_transefer(buf); + } else if (n == DELETE_FLOWS_PARAMS_LENGTH) { + /* delete rule */ + parse_and_delete_rule(buf); + } else if (n == CREATE_FLOWS_PARAMS_LENGTH) { + /* add rule */ + parse_and_create_rule(buf); + char reply_buf[REPLY_LEN]; + sprintf_s(reply_buf, sizeof(reply_buf), "%s", SUCCESS_REPLY); + posix_api->write_fn(connfd, reply_buf, REPLY_LEN); + } else if (n == GET_LSTACK_NUM) { + char reply_buf[REPLY_LEN]; + sprintf_s(reply_buf, sizeof(reply_buf), "%d", get_global_cfg_params()->num_cpu); + posix_api->write_fn(connfd, reply_buf, REPLY_LEN); + } else { + /* add port */ + parse_and_add_or_delete_listen_port(buf); + char reply_buf[REPLY_LEN]; + sprintf_s(reply_buf, sizeof(reply_buf), "%s", SUCCESS_REPLY); + posix_api->write_fn(connfd, reply_buf, REPLY_LEN); + } + + } + posix_api->close_fn(connfd); + } + posix_api->close_fn(listenfd); + return 0; +} + +void concat_mbuf_and_queue_id(struct rte_mbuf *mbuf, uint16_t queue_id, + char* mbuf_and_queue_id, int write_len) +{ + sprintf_s(mbuf_and_queue_id, write_len, "%lu%s%u", mbuf, SPLIT_DELIM, queue_id); +} + +static int mbuf_to_idx(struct rte_mbuf *mbuf, uint16_t *dst_port) +{ + struct rte_ether_hdr *ethh = rte_pktmbuf_mtod(mbuf, struct rte_ether_hdr *); + u16_t type = rte_be_to_cpu_16(ethh->ether_type); + uint32_t index = 0; + if (type == RTE_ETHER_TYPE_IPV4) { + struct rte_ipv4_hdr *iph = rte_pktmbuf_mtod_offset(mbuf, struct rte_ipv4_hdr *, sizeof(struct rte_ether_hdr)); + uint8_t ip_version = (iph->version_ihl & 0xf0) >> IPV4_VERSION_OFFSET; + if (likely(ip_version == IPV4_VERSION)) { + if (likely(iph->next_proto_id == IPPROTO_TCP)) { + struct rte_tcp_hdr *tcp_hdr = rte_pktmbuf_mtod_offset(mbuf, struct rte_tcp_hdr *, + sizeof(struct rte_ether_hdr) + sizeof(struct rte_ipv4_hdr)); + *dst_port = tcp_hdr->dst_port; + + if (unlikely(tcp_hdr->tcp_flags == TCP_SYN)) { + uint32_t src_ip = iph->src_addr; + uint16_t src_port = tcp_hdr->src_port; + index = rte_jhash_3words(src_ip, src_port | ((*dst_port) << 16), 0, 0); + } else { + return -1; + } + } + } + } else if (type == RTE_ETHER_TYPE_IPV6) { + struct rte_ipv6_hdr *iph = rte_pktmbuf_mtod_offset(mbuf, struct rte_ipv6_hdr *, sizeof(struct rte_ether_hdr)); + if (likely(iph->proto == IPPROTO_TCP)) { + struct rte_tcp_hdr *tcp_hdr = rte_pktmbuf_mtod_offset(mbuf, struct rte_tcp_hdr *, + sizeof(struct rte_ether_hdr) + sizeof(struct rte_ipv6_hdr)); + *dst_port = tcp_hdr->dst_port; + + if (unlikely(tcp_hdr->tcp_flags == TCP_SYN)) { + uint32_t *src_ip = (uint32_t *) &iph->src_addr; + uint16_t src_port = tcp_hdr->src_port; + uint32_t v = rte_jhash_3words(src_ip[0], src_ip[1], src_ip[2], 0); + index = rte_jhash_3words(src_ip[3], src_port | ((*dst_port) << 16), v, 0); + } else { + return -1; + } + } + } else { + return -1; + } + return index; +} + +int distribute_pakages(struct rte_mbuf *mbuf) +{ + uint16_t dst_port = 0; + uint32_t index = mbuf_to_idx(mbuf, &dst_port); + if (index == -1) { + return TRANSFER_CURRENT_THREAD; + } + + uint16_t queue_id = 0; + uint32_t user_process_idx = 0; + int each_process_queue_num = get_global_cfg_params()->num_queue; + index = index % each_process_queue_num; + if (g_listen_ports[dst_port] != INVAILD_PROCESS_IDX) { + user_process_idx = g_listen_ports[dst_port]; + } else { + user_process_idx = g_user_ports[dst_port]; + } + + if (user_process_idx == INVAILD_PROCESS_IDX) { + return TRANSFER_KERNEL; + } + + if (get_global_cfg_params()->seperate_send_recv) { + queue_id = user_process_idx * each_process_queue_num + (index / 2) * 2; + } else { + queue_id = user_process_idx * each_process_queue_num + index; + } + if (queue_id != 0) { + if (user_process_idx == 0) { + transfer_tcp_to_thread(mbuf, queue_id); + } else { + char mbuf_and_queue_id[TRANSFER_TCP_MUBF_LEN]; + concat_mbuf_and_queue_id(mbuf, queue_id, mbuf_and_queue_id, TRANSFER_TCP_MUBF_LEN); + transfer_pkt_to_other_process(mbuf_and_queue_id, user_process_idx, + TRANSFER_TCP_MUBF_LEN, false); + } + return TRANSFER_OTHER_THREAD; + } else { + return TRANSFER_CURRENT_THREAD; + } + + return TRANSFER_KERNEL; +} + +void gazelle_listen_thread(void *arg) +{ + struct cfg_params *cfg_param = get_global_cfg_params(); + recv_pkts_from_other_process(cfg_param->process_idx, arg); +} + +void flow_init(void) +{ + struct protocol_stack_group *stack_group = get_protocol_stack_group(); + init_listen_and_user_ports(); + + /* run to completion mode does not currently support multiple process */ + if (!use_ltran() && !get_global_cfg_params()->stack_mode_rtc) { + char name[PATH_MAX]; + sem_init(&stack_group->sem_listen_thread, 0, 0); + sprintf_s(name, sizeof(name), "%s", "listen_thread"); + struct sys_thread *thread = sys_thread_new(name, gazelle_listen_thread, + (void*)(&stack_group->sem_listen_thread), 0, 0); + free(thread); + sem_wait(&stack_group->sem_listen_thread); + } +} diff --git a/src/lstack/netif/lstack_vdev.c b/src/lstack/netif/lstack_vdev.c index f78e48a..3703092 100644 --- a/src/lstack/netif/lstack_vdev.c +++ b/src/lstack/netif/lstack_vdev.c @@ -33,6 +33,7 @@ #include "lstack_protocol_stack.h" #include "gazelle_reg_msg.h" #include "lstack_lwip.h" +#include "lstack_flow.h" #include "lstack_vdev.h" /* INUSE_TX_PKTS_WATERMARK < VDEV_RX_QUEUE_SZ; -- 2.27.0
Locations
Projects
Search
Status Monitor
Help
Open Build Service
OBS Manuals
API Documentation
OBS Portal
Reporting a Bug
Contact
Mailing List
Forums
Chat (IRC)
Twitter
Open Build Service (OBS)
is an
openSUSE project
.