QWaitCondition条件变量实现
QWaitCondition 用于多线程的同步,一个线程调用QWaitCondition::wait() 阻塞等待,直到另一个线程调用QWaitCondition::wake() 唤醒才继续往下执行。 QWaitCondition和QMutex配合使用,QWaitCondition::wait()必须传一个被锁定的QMutex对象: 1、调用QWaitCondition::wait(Mutex)后,锁定的Mutex将被解锁,调用线程将阻塞; 2、直到另一个线程使用wakeOne()或wakeAll(),wait()解锁返回true,程序继续执行wait()后面的代码; 3、wait()第二个参数是等待超时时间,如果传ULONG_MAX(默认值)则永不超时,如果等待超时,则wait()返回false。
下面的例子,在子线程中启动一个tcp客户端,主线程调用该tcp客户端发送数据,同步等待tcp服务器回复数据。
调用示例:
void MainWindow::on_pushButton_fasong_clicked()
{
tcpClientThread *tcpClient = new tcpClientThread("127.0.0.1",9002);
tcpClient->start();
while(!tcpClient->bRunReady)
{
QThread::msleep(100);
continue;
}
tcpClient->SetSocketTimeOut(3*1000);
if (tcpClient->AddSendMsg(ui->textEdit->toPlainText()) <= 0)
{
qDebug()<<"["<<__FILE__<<"]"<<__LINE__<<__FUNCTION__<<"发送数据失败 ";
}
QString revcMsg = tcpClient->GetMsgPack();
qDebug()<<"["<<__FILE__<<"]"<<__LINE__<<__FUNCTION__<<" "<<revcMsg;
tcpClient->DisconnectServer();
if(tcpClient)
tcpClient->deleteLater();
}
tcpclientthread.h//用于在子线程启动tcp客户端
#ifndef TCPCLIENTTHREAD_H
#define TCPCLIENTTHREAD_H
#include <QObject>
#include <QThread>
#include <QAbstractSocket>
#include <QHostAddress>
#include "tcpClient.h"
class tcpClientThread : public QThread
{
Q_OBJECT
public:
explicit tcpClientThread(QString ip,qint16 port,QObject *parent = nullptr);
~tcpClientThread();
void SetSocketTimeOut(int nTime);
long AddSendMsg(QString pMsg);
QString GetMsgPack();
short DisconnectServer();
bool bRunReady;
private:
CTCPClient *m_pCTCPClient;
QString m_ip;
qint16 m_port;
private:
void run();
signals:
void signalSetSocketTimeOut(int);
void signalAddSendMsg(QString pMsg, long *pSendNum);
void signalDisconnectServer();
public slots:
};
#endif
tcpclientthread.cpp
#include "tcpclientthread.h"
tcpClientThread::tcpClientThread(QString ip, qint16 port, QObject *parent) : QThread(parent)
{
m_ip = ip;
m_port = port;
bRunReady = false;
}
tcpClientThread::~tcpClientThread()
{
if(this->isRunning())
{
this->quit();
this->wait();
}
exit(0);
}
void tcpClientThread::SetSocketTimeOut(int nTime)
{
emit signalSetSocketTimeOut(nTime);
}
long tcpClientThread::AddSendMsg(QString pMsg)
{
long sendNum;
emit signalAddSendMsg(pMsg,&sendNum);
return sendNum;
}
QString tcpClientThread::GetMsgPack()
{
QString pPack;
m_pCTCPClient->mutex.lock();
if (m_pCTCPClient->isRev == false)
{
m_pCTCPClient->placeNotFull.wait(&m_pCTCPClient->mutex);
m_pCTCPClient->isRev = false;
}
m_pCTCPClient->isRev = false;
pPack = m_pCTCPClient->m_pReturnMsg;
m_pCTCPClient->mutex.unlock();
return pPack;
}
short tcpClientThread::DisconnectServer()
{
emit signalDisconnectServer();
this->quit();
this->wait();
if(m_pCTCPClient)
m_pCTCPClient->deleteLater();
return 0;
}
void tcpClientThread::run()
{
m_pCTCPClient = new CTCPClient(m_ip,m_port);
m_pCTCPClient->connectToHost(QHostAddress(m_ip),m_port);
m_pCTCPClient->waitForConnected();
connect(this,SIGNAL(signalSetSocketTimeOut(int)),m_pCTCPClient,SLOT(slotSetSocketTimeOut(int)));
connect(this,SIGNAL(signalAddSendMsg(QString,long*)),m_pCTCPClient,SLOT(slotAddSendMsg(QString,long*)),Qt::BlockingQueuedConnection);
connect(this,SIGNAL(signalDisconnectServer()),m_pCTCPClient,SLOT(slotDisconnectServer()),Qt::BlockingQueuedConnection);
if(m_pCTCPClient->localPort() == 0)
{
QString str = QString("错误信息:%1,IP=[%2], port=[%3], socket=[tcpClientThread::run],连接服务器失败!")
.arg(m_pCTCPClient->errorString().toLatin1().data())
.arg(m_ip)
.arg(QString::number(m_port));
qDebug()<<"["<<__FILE__<<"]"<<__LINE__<<__FUNCTION__<<" "<<str;
}
else
{
QString str = QString("IP=[%1], port=[%2], socket=[tcpClientThread::run],连接服务器成功!").arg(m_ip).arg(QString::number(m_port));
qDebug()<<"["<<__FILE__<<"]"<<__LINE__<<__FUNCTION__<<" "<<str;
}
bRunReady = true;
this->exec();
}
tcpClient.h//自定义tcp客户端,同步发送和接收数据
#ifndef CTCPClient_H
#define CTCPClient_H
#include <QThread>
#include <vector>
#include <QObject>
#include <QMutex>
#include <QTcpSocket>
#include <QWaitCondition>
#include <QTimer>
class CTCPClient : public QTcpSocket
{
Q_OBJECT
public:
CTCPClient(QString ip,qint16 port,QObject *parent=nullptr);
~CTCPClient();
int m_nTimeOut;
bool isRev;
QMutex mutex;
QWaitCondition placeNotFull;
QString m_pReturnMsg;
QTimer *m_timer;
private:
QString m_ip;
qint16 m_port;
public:
short DisconnectServer();
public slots:
void slotSetSocketTimeOut(int nTime);
void slotAddRecvMsg();
void slotAddSendMsg(QString pMsg, long *pSendNum);
void slotDisconnectServer();
void slottimerOut();
private:
QTcpSocket *m_pQTcpSocket;
};
#endif
tcpClient.cpp
#include "TCPClient.h"
#include <unistd.h>
#include <errno.h>
#include <signal.h>
CTCPClient::CTCPClient(QString ip, qint16 port, QObject *parent):QTcpSocket(parent),m_ip(ip),m_port(port)
{
m_nTimeOut = 60000;
isRev = false;
m_pReturnMsg = "";
m_timer = new QTimer(this) ;
m_timer->setSingleShot(true);
connect(m_timer , SIGNAL(timeout()) , this , SLOT(slottimerOut()),Qt::DirectConnection) ;
connect(this,SIGNAL(readyRead()),this,SLOT(slotAddRecvMsg()));
}
CTCPClient::~CTCPClient()
{
DisconnectServer();
}
short CTCPClient::DisconnectServer()
{
this->disconnectFromHost();
this->waitForDisconnected();
this->close();
return 0;
}
void CTCPClient::slotSetSocketTimeOut(int nTime)
{
m_nTimeOut = nTime;
}
void CTCPClient::slotAddSendMsg(QString pMsg, long *pSendNum)
{
qDebug()<<"["<<__FILE__<<"]"<<__LINE__<<__FUNCTION__<<"sendmsg ";
m_timer->start(m_nTimeOut);
*pSendNum = this->write(pMsg.toUtf8().data());
}
void CTCPClient::slotDisconnectServer()
{
this->disconnectFromHost();
this->waitForDisconnected();
this->close();
}
void CTCPClient::slottimerOut()
{
qDebug()<<"["<<__FILE__<<"]"<<__LINE__<<__FUNCTION__<<"time out ";
mutex.lock();
isRev = true;
m_pReturnMsg = "";
placeNotFull.wakeAll();
mutex.unlock();
}
void CTCPClient::slotAddRecvMsg()
{
qDebug()<<"["<<__FILE__<<"]"<<__LINE__<<__FUNCTION__<<"jieshoudao slotAddRecvMsg";
QByteArray array = this->readAll();
m_timer->stop();
mutex.lock();
isRev = true;
m_pReturnMsg = array.data();
placeNotFull.wakeAll();
mutex.unlock();
}
QMutex线程锁实现
一个应用程序的各个线程常常需要协同工作,有序使用资源,系统中的一些资源只允许一个线程使用,不允许多个线程同时使用。 类QMutex的一个对象被称为一个互斥体。一个互斥体有未锁定(unlocked)和锁定(locked)两种状态,初始状态为未锁定 1、当某个线程A首次调用QMutex的lock函数时,该互斥体变为锁定状态,函数立即返回,该线程继续运行; 2、此后,其他某个线程B调用lock函数时,由于该互斥体已经是锁定状态,线程B无法获得该互斥体,因而被暂停运行,等待线程A调用QMutex的unlock函数解除对该互斥体的锁定,线程B将被设置为阻塞状态(blocked)。 QMutexLocker类的使用 1、该类的构造函数接收一个互斥体,并在构造函数中调用该互斥体的lock函数; 2、该类的析构函数调用互斥体的unlock函数。在程序中简单地定义一个QMutexLocker对象,即可申请锁定该互斥体; 3、当程序运行到这个对象的作用域之外时,该对象的析构函数被调用,即可解除对互斥体的锁定。
下面的例子,两个子线程通过线程锁有序使用全局变量: mainwindow.h
#ifndef MAINWINDOW_H
#define MAINWINDOW_H
#include <QMainWindow>
#include <QThread>
#include <QMutex>
#include <QDebug>
namespace Ui {
class MainWindow;
}
class ThreadTwo_movetoThr;
class ThreadOne_Run;
class MainWindow : public QMainWindow
{
Q_OBJECT
public:
explicit MainWindow(QWidget *parent = nullptr);
~MainWindow();
private:
Ui::MainWindow *ui;
ThreadOne_Run *pThreadOne_Run;
ThreadTwo_movetoThr *pThreadTwo_movetoThr;
QThread *pobjThreadTwo_movetoThr;
signals:
void signal_dealTask();
};
class ThreadOne_Run : public QThread
{
Q_OBJECT
public:
explicit ThreadOne_Run(QObject *parent = nullptr);
protected:
void run();
};
class ThreadTwo_movetoThr : public QObject
{
Q_OBJECT
public:
explicit ThreadTwo_movetoThr(QObject *parent = nullptr);
public slots:
void slot_dealTask();
};
extern int Add;
extern QMutex mutex;
#endif
mainwindow.cpp
#include "mainwindow.h"
#include "ui_mainwindow.h"
int Add=3;
QMutex mutex;
MainWindow::MainWindow(QWidget *parent) :
QMainWindow(parent),
ui(new Ui::MainWindow)
{
ui->setupUi(this);
qDebug()<<"["<<__FILE__<<"]"<<__LINE__<<__FUNCTION__<<"MainWindow主线程ID "<<QThread::currentThread();
pThreadOne_Run = new ThreadOne_Run(this);
pThreadTwo_movetoThr = new ThreadTwo_movetoThr;
pobjThreadTwo_movetoThr = new QThread(this);
connect(pobjThreadTwo_movetoThr,&QThread::finished,pobjThreadTwo_movetoThr,&QObject::deleteLater);
connect(pThreadOne_Run,&QThread::finished,pThreadOne_Run,&QObject::deleteLater);
connect(this,&MainWindow::signal_dealTask,pThreadTwo_movetoThr,&ThreadTwo_movetoThr::slot_dealTask);
pThreadTwo_movetoThr->moveToThread(pobjThreadTwo_movetoThr);
pobjThreadTwo_movetoThr->start();
emit signal_dealTask();
pThreadOne_Run->start();
}
MainWindow::~MainWindow()
{
if(pThreadOne_Run->isRunning())
{
pThreadOne_Run->quit();
pThreadOne_Run->wait();
}
if(pobjThreadTwo_movetoThr->isRunning())
{
pobjThreadTwo_movetoThr->quit();
pobjThreadTwo_movetoThr->wait();
}
delete ui;
}
ThreadOne_Run::ThreadOne_Run(QObject *parent) : QThread(parent)
{
}
void ThreadOne_Run::run()
{
mutex.lock();
for(int index=0;index<8;index++)
{
qDebug()<<"["<<__FILE__<<"]"<<__LINE__<<__FUNCTION__<<"ThreadOne_Run子线程ID "<<QThread::currentThread()<<"Add="<<Add;
Add++;
}
mutex.unlock();
}
ThreadTwo_movetoThr::ThreadTwo_movetoThr(QObject *parent) : QObject(parent)
{
}
void ThreadTwo_movetoThr::slot_dealTask()
{
QMutexLocker locker(&mutex);
for(int index=0;index<8;index++)
{
qDebug()<<"["<<__FILE__<<"]"<<__LINE__<<__FUNCTION__<<"ThreadTwo_movetoThr子线程ID "<<QThread::currentThread()<<"Add="<<Add;
Add++;
}
}
|