RabbitMQ client 通过connection与RabbitMQ Server建立连接。且有心跳保活机制,
AMQPCPP库中,Server首先通过TcpHandler::onNegotiate向Client协商心跳的interval(默认60s),client的返回值代表建议的值。
/**
* Method that is called when the heartbeat frequency is negotiated
* between the server and the client. Applications can override this method
* if they want to use a different heartbeat interval (for example: return 0
* to disable heartbeats)
* @param connection The connection that suggested a heartbeat interval
* @param interval The suggested interval from the server
* @return uint16_t The interval to use
*
* @see ConnectionHandler::onNegotiate
*/
virtual uint16_t onNegotiate(TcpConnection *connection, uint16_t interval)
{
// make sure compilers dont complain about unused parameters
(void) connection;
(void) interval;
// default implementation, suggested heartbeat is ok
return interval;
}
?协商通过以后,Server会定期发Heartbeat给Client(间隔一般为协商的interval/2)探测Client是否存活。
/**
* Method that is called when the server sends a heartbeat to the client
* @param connection The connection over which the heartbeat was received
* @see ConnectionHandler::onHeartbeat
*/
virtual void onHeartbeat(TcpConnection *connection)
{
// make sure compilers dont complain about unused parameters
(void) connection;
}
-问题:
我最初在基于AMQPCPP库实现RabbitMQ Client的时候,总是收到ConnectionLost的回调错误,触发Client重连到另一个RabbitMQ节点上。原因是因为我没有在onHeartbeat发送HeartBeatFrame给Server端。在里面调用connection->heartbeat()函数以后问题解决:
void onHeartbeat(AMQP::TcpConnection *connection) override
{
_logger.lock()->debug("MyHandler::onHeartbeat");
connection->heartbeat();
}
?
?
|