接收
void gEthTCP_AgentTaskHandler(void)
{
int32_t err = ERR_OK;
struct pbuf *pbuf = NULL;
int32_t received_bytes = 0;
uint32_t addr_len = 0;
int32_t socket_cnt = 0;
struct srv_eth_agent_subscriber *subscriber = NULL;
struct component_config *component = LocalComponentConfig();
struct timeval select_timeout;
int32_t max_fd = -1;
fd_set read_fds;
for (;;)
{
FD_ZERO(&read_fds);
select_timeout.tv_sec = ETH_AGENT_TCP_SELECT_TIMEOUT_S;
select_timeout.tv_usec = ETH_AGENT_TCP_SELECT_TIMEOUT_MS;
for (socket_cnt = 0; socket_cnt < NUMBER_OF_ETHNET_AGENT_TCP_INTS; socket_cnt++)
{
if ((socket_cnt > NUMBER_OF_ETHNET_AGENT_TCP_INTS) || (!component->tcp_subscriber[socket_cnt].valid))
{
continue;
}
if (component->tcp_subscriber[socket_cnt].protocol == ETH_AGENT_SOCKET_TCP_SERVER)
{
if ( component->tcp_subscriber[socket_cnt].server_sock_fd >= 0 )
{
max_fd = max_fd < component->tcp_subscriber[socket_cnt].server_sock_fd ? component->tcp_subscriber[socket_cnt].server_sock_fd : max_fd;
FD_SET(component->tcp_subscriber[socket_cnt].server_sock_fd, &read_fds);
if (component->tcp_subscriber[socket_cnt].client_sock_fd > FD_UNINIT)
{
max_fd = max_fd < component->tcp_subscriber[socket_cnt].client_sock_fd ? component->tcp_subscriber[socket_cnt].client_sock_fd : max_fd;
FD_SET(component->tcp_subscriber[socket_cnt].client_sock_fd, &read_fds);
}
continue;
}
if ((component->tcp_subscriber[socket_cnt].server_sock_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1)
{
LOGGER_Printf("[ENET_AGENT] TCP Server Socket create error\r\n");
continue;
}
err = bind(component->tcp_subscriber[socket_cnt].server_sock_fd,
(struct sockaddr *)&component->tcp_subscriber[socket_cnt].conn_addr,
sizeof(struct sockaddr_in));
if (err < 0)
{
close(component->tcp_subscriber[socket_cnt].server_sock_fd);
LOGGER_Printf("[ENET_AGENT] TCP Socket bind error\r\n");
continue;
}
struct timeval receive_timeout =
{
.tv_sec = ETH_AGENT_TCP_RECV_TIMEOUT_MS,
.tv_usec = 0
};
err = setsockopt(component->tcp_subscriber[socket_cnt].server_sock_fd,
SOL_SOCKET,
SO_RCVTIMEO,
(char*)&receive_timeout,
sizeof(struct timeval));
#if SRV_ENABLE_ENET_AGENT_INFO
LOGGER_Printf("[ENET_AGENT]TCP Socket setsockopt error=%d\r\n", err);
#endif
if (err < 0)
{
close(component->tcp_subscriber[socket_cnt].server_sock_fd);
LOGGER_Printf("[ENET_AGENT] TCP Socket setsockopt error\r\n");
continue;
}
err = listen(component->tcp_subscriber[socket_cnt].server_sock_fd, 1);
if (err < 0)
{
close(component->tcp_subscriber[socket_cnt].server_sock_fd);
LOGGER_Printf("[ENET_AGENT] TCP Socket listen error\r\n");
continue;
}
addr_len = sizeof(struct sockaddr_in);
pbuf = pbuf_alloc(PBUF_TRANSPORT, ETH_AGENT_FRAME_BUF_MAX_LEN, PBUF_RAM);
if (!pbuf)
{
close(component->tcp_subscriber[socket_cnt].server_sock_fd);
LOGGER_Printf("[ENET_AGENT] TCP Socket pbuf_alloc error\r\n");
continue;
}
#if SRV_ENABLE_ENET_AGENT_INFO
LOGGER_Printf("[ENET_AGENT]TCP socket_cnt=%d,fd=%d accept\r\n",socket_cnt, component->tcp_subscriber[socket_cnt].server_sock_fd);
#endif
FD_SET(component->tcp_subscriber[socket_cnt].server_sock_fd, &read_fds);
if (max_fd < component->tcp_subscriber[socket_cnt].server_sock_fd)
{
max_fd = component->tcp_subscriber[socket_cnt].server_sock_fd;
}
}
/* TCP CLIENT */
else
{
if (component->tcp_subscriber[socket_cnt].connected)
{
max_fd = max_fd < component->tcp_subscriber[socket_cnt].client_sock_fd ? component->tcp_subscriber[socket_cnt].client_sock_fd : max_fd;
FD_SET(component->tcp_subscriber[socket_cnt].client_sock_fd, &read_fds);
continue;
}
if ((component->tcp_subscriber[socket_cnt].client_sock_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1)
{
LOGGER_Printf("[ENET_AGENT] TCP client Socket create error\r\n");
continue;
}
int err = 0;
err = connect(component->tcp_subscriber[socket_cnt].client_sock_fd, (struct sockaddr *)&component->tcp_subscriber[socket_cnt].conn_addr,sizeof(struct sockaddr));
if (err < 0 )
{
select_timeout.tv_sec = 1;
select_timeout.tv_usec = 0;
closesocket(component->tcp_subscriber[socket_cnt].client_sock_fd);
component->tcp_subscriber[socket_cnt].client_sock_fd = FD_UNINIT;
component->tcp_subscriber[socket_cnt].connected = false;
LOGGER_Printf("[ENET_AGENT]TCP Client err=%d, connect failed!\r\n",err);
}
else
{
component->tcp_subscriber[socket_cnt].connected = true;
component->tcp_subscriber[socket_cnt].tx_socket_fd = socket_cnt;
max_fd = max_fd < component->tcp_subscriber[socket_cnt].client_sock_fd ? component->tcp_subscriber[socket_cnt].client_sock_fd : max_fd;
FD_SET(component->tcp_subscriber[socket_cnt].client_sock_fd, &read_fds);
if (component->tcp_subscriber[socket_cnt].connectedCbk)
{
component->tcp_subscriber[socket_cnt].connectedCbk();
}
LOGGER_Printf("[ENET_AGENT]TCP Client socket_cnt=%d, get connection!\r\n",socket_cnt);
}
}
}
err = select(max_fd + 1, &read_fds, NULL, NULL, &select_timeout);
if (err <= 0)
{
continue;
}
for (socket_cnt = 0; socket_cnt < NUMBER_OF_ETHNET_AGENT_TCP_INTS; socket_cnt++)
{
if (component->tcp_subscriber[socket_cnt].server_sock_fd >= 0)
{
if (FD_ISSET(component->tcp_subscriber[socket_cnt].server_sock_fd, &read_fds))
{
if(!component->tcp_subscriber[socket_cnt].connected)
{
//handle accept
component->tcp_subscriber[socket_cnt].client_sock_fd = accept(component->tcp_subscriber[socket_cnt].server_sock_fd,
(struct sockaddr *)&component->tcp_subscriber[socket_cnt].conn_addr,
&addr_len);
if (component->tcp_subscriber[socket_cnt].client_sock_fd < 0)
{
LOGGER_Printf("[ENET_AGENT]TCP socket_cnt=%d, accept timeout\r\n",socket_cnt);
continue;
}
else
{
LOGGER_Printf("[ENET_AGENT]TCP socket_cnt=%d, get connection!\r\n",socket_cnt);
component->tcp_subscriber[socket_cnt].tx_socket_fd = component->tcp_subscriber[socket_cnt].client_sock_fd;
component->tcp_subscriber[socket_cnt].connected = true;
if (component->tcp_subscriber[socket_cnt].connectedCbk)
{
component->tcp_subscriber[socket_cnt].connectedCbk();
}
break;
}
}
}
}
if (component->tcp_subscriber[socket_cnt].client_sock_fd >= 0)
{
if (FD_ISSET(component->tcp_subscriber[socket_cnt].client_sock_fd, &read_fds))
{
received_bytes = recv(component->tcp_subscriber[socket_cnt].client_sock_fd,
pbuf->payload,
ETH_AGENT_FRAME_BUF_MAX_LEN,
0);
LOGGER_Printf("[ENET_AGENT]TCP received_bytes=%d\r\n", received_bytes);
if (received_bytes > 0)
{
if (component->tcp_subscriber[socket_cnt].withPrivateProtocol)
{
if (component->eth_agent_unpack_cb)
{
component->eth_agent_unpack_cb(pbuf->payload, received_bytes);
}
}
else
{
ACORE_AGENT_rx_distribute_process(pbuf->payload, received_bytes, component->tcp_subscriber[socket_cnt].service_id);
}
LOGGER_Printf("[ENET_AGENT]TCP data: %s\r\n", pbuf->payload);
break;
}
else
{
//handle connection
closesocket(component->tcp_subscriber[socket_cnt].client_sock_fd);
component->tcp_subscriber[socket_cnt].client_sock_fd = FD_UNINIT;
component->tcp_subscriber[socket_cnt].connected = false;
if (component->tcp_subscriber[socket_cnt].disconnectedCbk)
{
component->tcp_subscriber[socket_cnt].disconnectedCbk();
}
LOGGER_Printf("[ENET_AGENT]TCP socket_cnt=%d, closed connection! == 1\r\n",socket_cnt);
break;
}
}
}
}
}
}
发送:
int32_t srv_eth_agent_tcp_send(int32_t handle, uint8_t *buf, uint16_t buf_len)
{
int32_t ret;
uint16_t seq;
static SemaphoreHandle_t xSemph_Send = NULL;
static StaticSemaphore_t xMutexBuffer;
struct component_config *component = LocalComponentConfig();
if ((handle < 0) || (handle >= NUMBER_OF_ETHNET_AGENT_TCP_INTS))
{
ret = (DMN_ERR_SEND_CLIENT | 0x04);
SYS_LOG(eLOG_LEVEL_ERROR, "Ethernet Agent input Client error = 0x%x",
ret);
return ret;
}
if (NULL == buf || buf_len <= 0)
{
ret = (DMN_ERR_INPUT_INCONFORMITY | 0x04);
SYS_LOG(eLOG_LEVEL_ERROR, "%s Ethernet Agent input error = 0x%x",
component->tcp_subscriber[handle].pName, ret);
return ret;
}
if (NULL == xSemph_Send)
{
xSemph_Send = xSemaphoreCreateMutexStatic(&xMutexBuffer);
if (NULL == xSemph_Send)
{
ret = DMN_ERR_SEM_CREATE;
SYS_LOG(eLOG_LEVEL_ERROR,
"%s Ethernet Agent Create Semphore error = 0x%x",
component->tcp_subscriber[handle].pName, ret);
return ret;
}
SYS_LOG(eLOG_LEVEL_INFO, "%s Ethernet Agent Create Semaphore Success",
component->tcp_subscriber[handle].pName);
}
if (xSemaphoreTake(xSemph_Send, pdMS_TO_TICKS(100)))
{
#if MIDDLEWARE_FREERTOS_TCP_ENABLE
ret = FreeRTOS_send(component->tcp_subscriber[handle].socket_conn.xConnectedSocket, buf, buf_len, 0);
#endif
#if (MIDDLEWARE_LWIP_ENABLE == 1 || MIDDLEWARE_FREERTOS_TCP_ENABLE == 0)
ret = send(component->tcp_subscriber[handle].client_sock_fd, buf, buf_len, 0);
#endif
xSemaphoreGive(xSemph_Send);
if (ret < 0)
{
ret = (DMN_ERR_SEND_MESSAGE | (-ret));
SYS_LOG(eLOG_LEVEL_ERROR,
"%s Ethernet Agent FreeRTOS_send send msg error = 0x%x",
component->tcp_subscriber[handle].pName, ret);
return ret;
}
}
else
{
ret = DMN_ERR_SEM_TIMEOUT;
SYS_LOG(eLOG_LEVEL_ERROR,
"%s Ethernet Agent Get Semphore timeout error = 0x%x",
component->tcp_subscriber[handle].pName, ret);
return ret;
}
return pdTRUE;
}
|