前言
工作内容:为每个连接建立线程进行交互,并单独使用一个线程向所有连接持续发送相同数据.主线程执行等待连接. 本实现在ros下,使用C++语言和如下头文件所示的相关库.
1.头文件
#include <arpa/inet.h>
#include <netinet/in.h>
#include <ros/ros.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <cerrno>
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <iostream>
#include <string>
#include <pthread.h>
#include<signal.h>
#define SERV_PORT 8888
#define LISTEN_COUNT 5
ros::Publisher* m_pub;
pthread_mutex_t cm_mutex;
struct connect_message{
int connfd;
struct sockaddr_in cliaddr;
};
std::vector<connect_message> all_connect;
2.main函数
代码如下(示例):
int main(int argc, char** argv) {
ros::init(argc, argv, "my_server");
ros::NodeHandle n;
ros::Subscriber server_sub;
server_sub = n.subscribe("sub", 1, subcallback);
ros::Publisher pub = n.advertise<std_msgs::String>("pub", 1);
m_pub = &pub;
ros::Rate loop_rate(1);
signal(SIGINT, mySigintHandler);
std::cout << "start server" << std::endl;
struct sockaddr_in cliaddr;
socklen_t cliaddr_len;
int connfd;
int listenfd = socket(AF_INET, SOCK_STREAM, 0);
if (listenfd == -1) {
std::cout << "Error socket" << std::endl;
return 0;
}
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(SERV_PORT);
addr.sin_addr.s_addr = INADDR_ANY;
int opt = 1;
setsockopt(listenfd, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt));
if (bind(listenfd, (struct sockaddr*)&addr, sizeof(addr)) == -1) {
std::cout << "Error bind" << std::endl;
return 0;
}
if (listen(listenfd, LISTEN_COUNT) == -1) {
std::cout << "Error listen" << std::endl;
return 0;
}
pthread_mutex_init(&cm_mutex, NULL);
pthread_t circular_trans_th;
pthread_create(&circular_trans_th, NULL, circular_trans, (void*)&all_connect);
pthread_detach(circular_trans_th);
while (true) {
cliaddr_len = sizeof(cliaddr);
connfd = accept(listenfd, (struct sockaddr*)&cliaddr, &cliaddr_len);
{
connect_message cm;
cm.connfd = connfd;
cm.cliaddr = cliaddr;
pthread_mutex_lock(&cm_mutex);
all_connect.push_back(cm);
pthread_mutex_unlock(&cm_mutex);
}
char sendbuf[20] = {'\0'};
ROS_INFO("connected : %s:%d\r\n", inet_ntop(AF_INET, (void*)&cliaddr.sin_addr, sendbuf, 16), cliaddr.sin_port);
pthread_t th;
pthread_create(&th, NULL, msgRecv, (void*)&connfd);
pthread_detach(th);
ros::spinOnce();
loop_rate.sleep();
}
std::cout << "Socket close" << std::endl;
close(listenfd);
return 0;
}
3.相关函数
3.1 subcallback
ros消息回调函数,其他实现请忽略.
void subcallback(iau_ros_msgs::VehicleHmiConstPtr msg) {
}
void mySigintHandler(int sig){
ROS_INFO("shutting down");
ros::shutdown();
exit(0);
}
3.2 server与client的交互函数
void* msgRecv(void* arg){
std_msgs::String msg;
int newsock=*(int*)arg;
char buf[1024];
while (true){
int len = recv(newsock, buf, sizeof(buf), 0);
if(len>0){
buf[len] = 0x00;
std::string s = buf;
std::cout << s << std::endl;
msg.data=buf;
m_pub->publish(msg);
}
else{
pthread_exit(NULL);
}
}
}
3.3 循环向所有连接发送信息的函数
void* circular_trans(void* msg) {
while (true) {
while(all_connect.empty()) {
std::cout << "sleep3sec\n";
sleep(3);
}
pthread_mutex_lock(&cm_mutex);
for (std::vector<connect_message>::iterator it = all_connect.begin(); it != all_connect.end(); ++it) {
std::string str = "hello world";
int len = send(it->connfd, str.c_str(), str.size(), 0);
if (len < 0) {
close(it->connfd);
char sendbuf[20] = {'\0'};
ROS_INFO("Disconnected : %s:%d\r\n", inet_ntop(AF_INET, (void*)&it->cliaddr.sin_addr, sendbuf, 16), it->cliaddr.sin_port);
all_connect.erase(it);
break;
}
if (it == all_connect.end() - 1) {
break;
}
}
pthread_mutex_unlock(&cm_mutex);
}
}
总结
1.多线程的使用需要小心,detach的线程执行结束应该主动关闭. 2.在数据上锁后数据更新需要重新上锁才能完成. 3.accept写在死循环中可以持续监听连接.
|