tcp粘包处理
tcp是流式传输的,是安全的, 可靠的,顺序的。
udp是数据报协议,是不可靠的。
面试中经常被问到tcp粘包是如何处理的,通过百度和自己的理解,这里做笔记记录。
如果有不对,请指正~
参考:https://bbs.csdn.net/topics/380167545
业务层认识tcp传输及分析方案
tcp是流式传输的,tcp协议栈可以保证传输过程的顺序,可靠性:
? 也就是说,发送端调用send发送,接收端肯定能按照发送顺序依次接收到。
tcp是流式传输的,协议栈只是把接收到的数据放入到对应连接fd的缓冲区中。(待从源码确定)
1:分析tcp协议栈的接收:
1:协议栈只是负责把接收到的数据放入应用层缓冲区(很大的文件传输时,可能一次放不下)。
? ==》自己的误区,tcp是安全的流式传输,缓冲区接收到的数据必然是完整的,有序的,只会涉及到粘包处理,不涉其他,这里要考虑大包的多次接收。
2:获取缓冲区中的包,可能是多个小包粘合在一起的大包(需要做粘包处理)
? ==》小包的粘包问题,需要再业务层做处理,nagle算法
3:获取缓冲区中的包,可能是一个大包拆分出的多个小包进行传输 (接收到半包)
? ==》默认是MTU大小,协议栈会拆包,这个应该是协议栈做了处理,收到还是一个大包,我们只需要循环接收做业务处理(并不需要组包处理)。 ? ==》recv时,根据长度用while循环做接收,确保包的完整。(拆包后的包应该是依次接收到,从缓冲区中获取到)
4:放入缓冲区中数据的完整性,协议栈是否是一次把一整个包放进来的,应该是的(看看源码吧,协议栈是如何处理的?) ==》recv时,需要用while循环做多次的接收。
2:tcp的接收,我们需要关注的问题:
1:对粘包问题做拆包处理。(需要业务层定义协议,增加结束符,固定长度,或者指定长度)
2:对协议栈大包拆包后的数据做组包处理/持续接收(半包)。 ==》流式可靠顺序传输的
? ==》同一个连接,用while循环一直进行接收,应该时可以保证持续接收到的数据时这个大包的数据。
? ==》根据业务,需要在业务层,对大包进行自定义协议拆包后组包(用户控制发小包),或者根据长度持续接收(确保一个大包的接收完整),或者用缓冲区保存(可能多个连接同时处理的业务)等处理。
3:recv一次时,不一定能获取到一整个包。 (半包)==》应该还是只有大于MTU时拆包场景
? ==》同2,while多次循环接收,通过长度/校验码确保数据的完整可靠。
4:如果要关注业务层的消息机制,消息的完整性,正确性,需要业务层对消息做一定的协议处理。
3:总结及方案
1:tcp是流式安全的,传输接收到的数据必然是顺序的,有序的:
? ==》1:半包问题(协议栈拆包),多次while循环进行接收。
? ==》2:粘包问题,需要在业务层定义协议处理。
2:处理粘包问题的方案:
? 1:发送固定长度的字节,不够时补充特定字符如0
? 2:末尾终结符 如\r\n,如FTP协议
? 3:区分消息为头部和消息体,收到足够的数据确定包的完整性
? 4:混合使用在应用层做拆包和粘包的处理。(头+类型+长度+数据+尾)
实现一个tcp粘包处理的demo
这里没有考虑大包的拆包时,缓冲区的细节机制。
简单描述:
1:定义用户层协议,使用“头+data+尾部”(这里的头部和尾部自己定义比较随意)的方案,方便粘包处理以及保证数据完整性。(data数据区可以扩展协议)
2:用一个ringbuffer暂存接收到的数据(暂时不考虑半包问题,可以用多个ringbuffer保存不同的fd的缓存做处理/发送端做拆包处理,ringbuffer接收后做业务层作保处理)。
3:接收到数据后放入ringbuffer中,并根据头+尾检测ringbuffer中数据的完整性,这里只是做打印(业务处理以及组包协议定义等都可以自由扩展)
测试demo:
tcp_ringbuffer.h
#ifndef __RINGBUFFER_H_
#define __RINGBUFFER_H_
typedef struct RINGBUFF_T{
void * data;
unsigned int size;
unsigned int read_pos;
unsigned int write_pos;
}ringbuffer_t;
ringbuffer_t * ringbuffer_create(unsigned int size);
void ringbuffer_destroy(ringbuffer_t * ring_buffer);
int ringbuffer_put(ringbuffer_t * ring_buffer, const char* buffer, unsigned int len);
int ringbuffer_get_len(ringbuffer_t *ring_buffer);
int ringbuffer_get(ringbuffer_t * ring_buffer, char * buffer, unsigned int len);
void ringbuffer_reset(ringbuffer_t * ring_buffer);
int ringbuffer_use_len(ringbuffer_t * ring_buffer);
int ringbuffer_space_len(ringbuffer_t * ring_buffer);
int ringbuffer_isempty(ringbuffer_t * ring_buffer);
int ringbuffer_isfull(ringbuffer_t * ring_buffer);
#endif
tcp_ringbuffer.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "tcp_ringbuffer.h"
static inline __attribute__((const))
int is_power_of_2(unsigned long n)
{
return (n != 0 && ((n & (n - 1)) == 0));
}
static unsigned long roundup_power_of_two(unsigned long n)
{
if((n & (n-1)) == 0)
return n;
unsigned long maxulong = (unsigned long)((unsigned long)~0);
unsigned long andv = ~(maxulong&(maxulong>>1));
while((andv & n) == 0)
andv = andv>>1;
return andv<<1;
}
ringbuffer_t * ringbuffer_create(unsigned int size)
{
if (!is_power_of_2(size)) {
size = roundup_power_of_two(size);
}
ringbuffer_t * ring_buffer;
ring_buffer = (ringbuffer_t*)malloc(sizeof(*ring_buffer));
if(ring_buffer == NULL)
{
printf("create ringbuffer error \n");
return NULL;
}
ring_buffer->data = (void*)malloc(size);
if(ring_buffer->data == NULL)
{
printf("create ringbuffer data error \n");
free(ring_buffer);
return NULL;
}
ring_buffer->size = size;
ring_buffer->read_pos = 0;
ring_buffer->write_pos = 0;
return ring_buffer;
}
void ringbuffer_destroy(ringbuffer_t * ring_buffer)
{
if(ring_buffer)
{
if(ring_buffer->data)
{
free(ring_buffer->data);
ring_buffer->data = NULL;
}
free(ring_buffer);
ring_buffer = NULL;
}
}
int ringbuffer_put(ringbuffer_t * ring_buffer, const char* buffer, unsigned int len)
{
if(ring_buffer->write_pos >=ring_buffer->read_pos &&(len <(ring_buffer->size - ring_buffer->write_pos +ring_buffer->read_pos)))
{
if(ring_buffer->size - ring_buffer->write_pos >len)
{
memcpy(ring_buffer->data + ring_buffer->write_pos, buffer, len);
ring_buffer->write_pos += len;
}else
{
unsigned int right_space_len = ring_buffer->size - ring_buffer->write_pos;
memcpy(ring_buffer->data + ring_buffer->write_pos, buffer, right_space_len);
memcpy(ring_buffer->data, buffer+right_space_len, len - right_space_len);
ring_buffer->write_pos = len - right_space_len;
}
return 0;
}
if(ring_buffer->write_pos <ring_buffer->read_pos && (ring_buffer->read_pos - ring_buffer->write_pos) >len)
{
memcpy(ring_buffer->data + ring_buffer->write_pos, buffer, len);
ring_buffer->write_pos += len;
return 0;
}
return -1;
}
int ringbuffer_get_len(ringbuffer_t *ring_buffer)
{
if(ringbuffer_use_len(ring_buffer) < strlen("FFFF0D0A<header><tail>0D0AFEFE"))
{
printf("ringbuffer data is error [%d], [%ld]\n", ringbuffer_use_len(ring_buffer), strlen("FFFF0D0A<header><tail>0D0AFEFE"));
return -1;
}
const char* end_str = "<tail>0D0AFEFE";
char check_end_str[20] = {0};
if(ring_buffer->write_pos >strlen(end_str))
{
memcpy(check_end_str, ring_buffer->data+ring_buffer->write_pos - (strlen(end_str)), strlen(end_str));
}else
{
unsigned int left_len = ring_buffer->write_pos;
memcpy(check_end_str, ring_buffer->data +ring_buffer->size - (strlen(end_str) - left_len), ring_buffer->size - (strlen(end_str) - left_len));
memcpy(check_end_str + (strlen(end_str) - left_len), ring_buffer->data, left_len);
}
printf("get check_end_str is %s \n", check_end_str);
char * ret_addr = strstr(check_end_str, end_str);
if(ret_addr == NULL)
{
return -1;
}
if(check_end_str - ret_addr != 0)
{
printf("DDDDD :why end string is error");
return -1;
}
return ringbuffer_use_len(ring_buffer);
}
int ringbuffer_get(ringbuffer_t * ring_buffer, char * buffer, unsigned int len)
{
int data_len = ringbuffer_use_len(ring_buffer);
if(data_len >= len)
{
printf("para buffer is not enough space \n");
return -1;
}
if(ring_buffer->write_pos >ring_buffer->read_pos )
{
printf("get data from ringbuffer len: [%d] \n", ring_buffer->write_pos - ring_buffer->read_pos);
memcpy(buffer, ring_buffer->data + ring_buffer->read_pos, data_len);
}else
{
memcpy(buffer, ring_buffer->data+ring_buffer->read_pos, ring_buffer->size - ring_buffer->read_pos);
memcpy(buffer+ring_buffer->size - ring_buffer->read_pos, ring_buffer->data, data_len - (ring_buffer->size - ring_buffer->read_pos));
}
ring_buffer->write_pos = 0;
ring_buffer->read_pos = 0;
return 0;
}
int ringbuffer_get_from_dev()
{
return 0;
}
int ringbuffer_put_to_dev()
{
return 0;
}
void ringbuffer_reset(ringbuffer_t * ring_buffer)
{
ring_buffer->read_pos = ring_buffer->write_pos = 0;
}
int ringbuffer_use_len(ringbuffer_t * ring_buffer)
{
if(ring_buffer->write_pos >= ring_buffer->read_pos)
{
return ring_buffer->write_pos-ring_buffer->read_pos;
}
return ring_buffer->write_pos + ring_buffer->size - ring_buffer->read_pos;
}
int ringbuffer_space_len(ringbuffer_t * ring_buffer)
{
if(ring_buffer->write_pos >= ring_buffer->read_pos)
{
return ring_buffer->read_pos +(ring_buffer->size - ring_buffer->write_pos);
}
return ring_buffer->read_pos - ring_buffer->write_pos;
}
int ringbuffer_isempty(ringbuffer_t * ring_buffer)
{
return ringbuffer_use_len(ring_buffer) == 0? 0 :-1;
}
int ringbuffer_isfull(ringbuffer_t * ring_buffer)
{
return ringbuffer_space_len(ring_buffer) == 0? 0 :-1;
}
tcp_sticky_bag.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <errno.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include "tcp_ringbuffer.h"
int demo_accept_exec(int epfd, int listenfd);
int demo_recv_exec(ringbuffer_t *ringbuff, int epofd, int connectfd);
int demo_init_socket();
int demo_server_exec(int listenfd);
int check_recv_data(char *data, int len);
int SetNonblock(int fd) {
int flags;
flags = fcntl(fd, F_GETFL, 0);
if (flags < 0)
return flags;
flags |= O_NONBLOCK;
if (fcntl(fd, F_SETFL, flags) < 0)
return -1;
return 0;
}
int demo_init_socket()
{
int fd = socket(AF_INET, SOCK_STREAM, 0);
if(fd < 0)
{
printf("create socket fd error. \n");
return -1;
}
SetNonblock(fd);
struct sockaddr_in server_addr;
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(9000);
server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
int optval = 1;
setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(int));
bind(fd, (struct sockaddr*)&server_addr, sizeof(server_addr));
if(listen(fd, 20) < 0)
{
printf("listen sock fd error. \n");
}
printf(" listen 9000 port,create socket fd is %d \n", fd);
return fd;
}
int demo_server_exec(int listenfd)
{
int epfd = -1;
{
epfd = epoll_create(1);
if(epfd == -1)
{
printf("create epoll error \n");
close(listenfd);
return -1;
}
struct epoll_event event;
event.data.fd = listenfd;
event.events = EPOLLIN|EPOLLET;
if(epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &event) == -1)
{
printf("add listenfd to epoll error \n");
close(listenfd);
close(epfd);
return -1;
}
}
ringbuffer_t * ringbuff = ringbuffer_create(1024*4);
if(ringbuff == NULL)
{
printf("create ringbuff error, ringbuff is null \n");
close(listenfd);
close(epfd);
return -1;
}
struct epoll_event event_wait[1024];
int nready = 0;
printf("start to exex server: \n");
while(1)
{
nready = epoll_wait(epfd, event_wait, 1024, 1000);
if(nready < 0)
{
if (errno == EINTR)
{
printf("epoll_wait return and errno is EINTR \n");
continue;
}
printf("epoll_wait error. \n");
break;
}else if(nready == 0)
{
continue;
}
for(int i=0; i< nready; i++)
{
if(event_wait[i].events & EPOLLIN)
{
if(event_wait[i].data.fd == listenfd)
{
demo_accept_exec(epfd, listenfd);
}else
{
demo_recv_exec(ringbuff, epfd, event_wait[i].data.fd);
}
}
if (event_wait[i].events & (EPOLLERR | EPOLLHUP))
{
printf("epoll error [EPOLLERR | EPOLLHUP].\n");
epoll_ctl(epfd, EPOLL_CTL_DEL, event_wait[i].data.fd, NULL);
close(event_wait[i].data.fd);
}
}
}
ringbuffer_destroy(ringbuff);
return 0;
}
int demo_accept_exec(int epfd, int listenfd)
{
struct sockaddr_in cliaddr;
socklen_t clilen = sizeof(cliaddr);
int clifd = -1;
while((clifd = accept(listenfd, (struct sockaddr*)&cliaddr, &clilen)) >= 0)
{
SetNonblock(clifd);
struct epoll_event clifd_event;
clifd_event.data.fd = clifd;
clifd_event.events = EPOLLIN | EPOLLET;
if(epoll_ctl(epfd, EPOLL_CTL_ADD, clifd, &clifd_event) == -1)
{
printf(" epoll ctl error . \n");
close(clifd);
return -1;
}
printf("accept success. [%s:%d] connected \n", inet_ntoa(cliaddr.sin_addr), ntohs(cliaddr.sin_port));
}
return 0;
}
int demo_recv_exec(ringbuffer_t *ringbuff, int epfd, int connectfd)
{
printf("start to recv \n");
char recv_data [1024] = {0};
while(1)
{
int datalen = -1;
while((datalen = read(connectfd, recv_data, 1024)) > 0)
{
printf("recv data is [%s] [%lu] [%d]\n", recv_data, strlen(recv_data), datalen);
if(ringbuffer_put(ringbuff, recv_data, datalen)== 0)
{
printf("put to ringbuff success \n");
}
memset(recv_data, 0 , 1024);
}
if(datalen == 0)
{
if(epoll_ctl(epfd, EPOLL_CTL_DEL, connectfd, 0) == -1)
{
printf("client disconnection error from epoll \n");
}else
{
printf("client disconnected success,clientfd is [%d] \n", connectfd);
}
close(connectfd);
break;
}
if(datalen < 0)
{
printf("DDDDD recv data len <0 \n");
if (errno == EWOULDBLOCK && errno == EINTR)
{
continue;
}
break;
}
}
int ringbuff_data_len = ringbuffer_get_len(ringbuff);
if(ringbuff_data_len == -1)
{
printf("get ringbuff dada error. \n");
return 0;
}
printf("get ringbuff data len is %d \n", ringbuff_data_len);
char * data = (char*)malloc(ringbuff_data_len +1);
memset(data, 0, ringbuff_data_len+1);
ringbuffer_get(ringbuff, data, ringbuff_data_len+1);
printf("get all data is [%ld][%s] \n", strlen(data), data);
check_recv_data(data, ringbuff_data_len);
free(data);
data =NULL;
return 0;
}
int exec_one_package_data(char* data, int len)
{
char * print_data = malloc(len+1);
memset(print_data, 0, len+1);
memcpy(print_data, data, len);
printf("111 one package data is [%s][%ld][%d] \n", print_data, strlen(print_data), len);
free(print_data);
return 0;
}
int exec_one_data(char* data, int len)
{
char * print_data = malloc(len+1);
memset(print_data, 0, len+1);
memcpy(print_data, data, len);
printf("one package data is [%s][%ld][%d] \n", print_data, strlen(print_data), len);
free(print_data);
const char * start_str = "FFFF0D0A<header>";
char * ops;
ops = strstr(data, start_str);
if(ops == data)
{
exec_one_package_data(data + strlen("FFFF0D0A<header>"), len -strlen("FFFF0D0A<header><tail>0D0AFEFE"));
}
if(ops == NULL)
{
printf("package data is error, not find start data. \n");
for(int i=0; i<len; i++)
{
printf("%c", *(data+i));
}
printf("\n");
}
if(ops != data)
{
printf("recv package data is error. ");
for(int i=0; i<len; i++)
{
printf("%c", *(data+i));
}
printf("\n");
exec_one_package_data(ops+ strlen("FFFF0D0A<header>"), len - strlen("FFFF0D0A<header><tail>0D0AFEFE") - (ops-data));
}
return 0;
}
int check_recv_data(char *data, int len)
{
if(len <= strlen("FFFF0D0A<header><tail>0D0AFEFE"))
{
printf("get data from ringbuff is error, and throw away data : %s.", data);
return -1;
}
int datalen = -1;
char * onedata;
char * ops;
char * temp_data = data;
const char * end_str = "<tail>0D0AFEFE";
while((ops = strstr(temp_data, end_str)) != NULL)
{
datalen = ops - temp_data +strlen(end_str);
exec_one_data(temp_data, datalen);
temp_data = ops+strlen(end_str);
}
if(temp_data - data != len)
{
printf("there is loss data: [%ld][%s] \n", strlen(temp_data), temp_data);
}
return 0;
}
int main()
{
int fd = demo_init_socket();
if(fd < 0)
{
printf("create fd error is %d \n", fd);
return -1;
}
printf("create fd success is %d \n", fd);
demo_server_exec(fd);
printf("main func end\n");
return 0;
}
|