简介
本文是上一篇的延续继dpdk从给定的port/queue捕获流量_晓镁的博客-CSDN博客
rte_ring_dequeue_burst
result = rte_ring_dequeue_burst(arg0,arg1,arg2,arg3) 将多个对象从一个环形队列ring取出,直到达到最大数量。 该函数调用多消费者或单消费者批量出队列,取决于在创建ring时指定的默认行为。
arg0为环形队列ring指针 arg1为 指用数据包填充的void*指针数组指针,示例为void* dequeued[]数组 arg2为从环出列到 obj_table 的对象数。 arg3如果如果非NULL,则为返回出队完成后剩余的环条目数。 返回值为出队的对象数。
#include <stdbool.h>
#include <sys/time.h>
#include <time.h>
#include <string.h>
#include <stdio.h>
#include <signal.h>
#include <arpa/inet.h>
#include <dpdk/rte_ring.h>
#include <dpdk/rte_lcore.h>
#include <dpdk/rte_log.h>
#include <dpdk/rte_mbuf.h>
#include <dpdk/rte_branch_prediction.h>
#include <dpdk/rte_tcp.h>
#include <dpdk/rte_ethdev.h>
#include <dpdk/rte_ether.h>
#include <dpdk/rte_ethdev.h>
#include <dpdk/rte_ip.h>
#include <dpdk/rte_eal.h>
#include "core_write.h"
#include "redis.h"
#define MIN(a,b) (((a)<(b))?(a):(b))
#define RTE_LOGTYPE_DPDKCAP RTE_LOGTYPE_USER1
#include <time.h>
#include <arpa/inet.h>
#include <unistd.h>
#include "hash.h"
#include "redis.h"
#include <dpdk/rte_lcore.h>
#include <dpdk/rte_log.h>
#include <dpdk/rte_tcp.h>
#include <dpdk/rte_ethdev.h>
#include <dpdk/rte_ether.h>
#include <dpdk/rte_ethdev.h>
#include <dpdk/rte_ip.h>
#include <dpdk/rte_eal.h>
#define RTE_LOGTYPE_DPDKCAP RTE_LOGTYPE_USER1
/*
* process the packets form the ring
*/
int write_core(const struct core_write_config * config) {
unsigned int packet_length, wire_packet_length = 0;
int result;
void* dequeued[DPDKCAP_WRITE_BURST_SIZE];
struct rte_mbuf* bufptr;
int retval = 0;
//Init current time
int i = 0;
//Init stats
*(config->stats) = (struct core_write_stats) {
.core_id=rte_lcore_id(),
.packets = 0,
.bytes = 0,
.compressed_bytes = 0,
};
RTE_LOG(INFO, DPDKCAP, "Core %d is process packet.\n",
rte_lcore_id());
for (;;) {
if (unlikely(*(config->stop_condition))) {
break;
}
//从环形队列ring获取packets
result = rte_ring_dequeue_burst(config->ring,
dequeued, DPDKCAP_WRITE_BURST_SIZE);
if (result == 0) {
continue;
}
//从环形ring取到数据计数
config->stats->packets += result;
for (i = 0; i < result; i++) {
// 逐一处理数据包
bufptr = dequeued[i];
// 基于hashtable对数据包进行分析统计
addPacket(config->table,bufptr);
// 数据流量统计
packet_length = wire_packet_length;
config->stats->bytes += packet_length;
// Free buffer
rte_pktmbuf_free(bufptr);
}
}
RTE_LOG(INFO, DPDKCAP, "count core %d\n", rte_lcore_id());
return retval;
}
数据包分析hash表统计
addPacket()函数pkt为数据包结构体。
数据包分析时首先从以太头、ip报文、tcp/udp/icmp逐层offset偏移进行解包分析。 通过原目的ip计算哈希key值,value为struct info结构体,该结构体记录数据包分析结果。
如果哈希冲突怎么办?
如果哈希冲突,那么基于原有的哈希节点创建链表,把原有哈希节点的next指针指向冲突的节点。 这是典型的哈希冲突处理方式。
/* 计算hashtable下标 */
int getIndex(uint32_t src,uint32_t dst){
uint32_t index = src+dst;
return ((index>>16) + (index&65535))&65535;
}
/* 增加一个数据包 */
int addPacket(struct info **table,struct rte_mbuf *pkt){
struct ipv4_hdr *ip;
struct ether_hdr *eth_hdr = rte_pktmbuf_mtod_offset(pkt, struct ether_hdr *,0);
ip = rte_pktmbuf_mtod_offset(pkt, struct ipv4_hdr *,sizeof(struct ether_hdr));
/* 处理IPv4数据包 */
if (eth_hdr->ether_type == 0x0008) {
int index = getIndex(ip->src_addr,ip->dst_addr);
struct info *p;
/* 查找hash table中该数据包对应的节点 */
if(table[index]==NULL){
p = (struct info *)malloc(sizeof(struct info));
p->src = ip->src_addr;
p->dst = ip->dst_addr;
p->icmp = 0;
p->tcp = 0;
p->http = 0;
p->udp = 0;
p->syn = 0;
p->ack = 0;
p->count = 0;
p->other = 0;
p->next = NULL;
p->time = time(0);
table[index] = p;
}else{
p = table[index];
while(p->next != NULL){
if(p->src == ip->src_addr && p->dst == ip->dst_addr){
break;
}
p = p->next;
}
/* p是尾节点并且和当前数据包信息不匹配 */
if(p->next != NULL){
if(p->src != ip->src_addr || p->dst != ip->dst_addr){
p->next = (struct info *)malloc(sizeof(struct info));
p = p->next;
p->src = ip->src_addr;
p->dst = ip->dst_addr;
p->icmp = 0;
p->tcp = 0;
p->http = 0;
p->udp = 0;
p->syn = 0;
p->ack = 0;
p->count = 0;
p->other = 0;
p->time = time(0);
p->next = NULL;
}
}
}
p->count += pkt->data_len;
/* 处理tcp数据包 */
if(ip->next_proto_id ==6){
p->tcp++;
struct tcp_hdr *tcp = rte_pktmbuf_mtod_offset(pkt, struct tcp_hdr *,
sizeof(struct ether_hdr)+sizeof(struct ipv4_hdr));
if(ntohs(tcp->src_port) == 80 || ntohs(tcp->src_port) == 443 || ntohs(tcp->src_port) == 8080 || ntohs(tcp->dst_port)==443 || ntohs(tcp->dst_port)==80 || ntohs(tcp->dst_port)==8080){
p->http++;
}
if(tcp->tcp_flags & 2){
p->syn++;
}else if(tcp->tcp_flags & 16){
p->ack++;
}
/* 处理udp数据包 */
}else if(ip->next_proto_id ==17){
p->udp++;
/* 处理icmp数据包 */
}else if(ip->next_proto_id ==1){
p->icmp++;
/* 其它协议的包 */
}else{
p->other++;
}
} else if (RTE_ETH_IS_IPV6_HDR(pkt->packet_type)) {
/* Fill acl structure */
} else {
/* Unknown type, drop the packet */
}
return 0;
}
void initTable(struct info **table){
int i;
for(i=0;i<HASH_TABLE_SIZE;i++){
table[i]=NULL;
}
}
|