producer.php
<?php
try {
// 1.建立连接
$connection = new AMQPConnection([
'host' => '127.0.0.1',
'port' => 5672,
'vhost' => '/',
'login' => 'guest',
'password' => 'guest'
]);
$connection->connect();
// 2.建立通道
$channel = new AMQPChannel($connection);
// 3.创建交换机
$exchange = new AMQPExchange($channel);
$exchangeName = 'trade';
$exchange->setName($exchangeName);
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->declareExchange();
// 5.绑定路由关系发送消息
$data = [
'tid' => uniqid(),
'msg' => 'trade'
];
$routingKey = '/trade';
$exchange->publish(json_encode($data));
} catch (Exception $e) {
echo $e->getMessage();
}
consumer.php
<?php
$workerNum = 4; // 一般为CPU核数的4倍
// 进程池
$pool = new Swoole\Process\Pool($workerNum);
// 多进程,共享一个连接
$pool->on('WorkerStart', function($pool, $workerId) {
// 子进程空间
echo "WorkerId {$workerId} is started \n";
try {
// 1.建立连接
$connection = new AMQPConnection([
'host' => '127.0.0.1',
'port' => 5672,
'vhost' => '/',
'login' => 'guest',
'password' => 'guest'
]);
$connection->connect();
// 2.建立通道
$channel = new AMQPChannel($connection);
// 3.创建队列
$queueName = 'trade';
$queue = new AMQPQueue($channel);
$queue->setName('trade');
$queue->declareQueue();
$data = [
'tid' => uniqid(),
'msg' => 'trade'
];
$routingKey = '/trade';
// 4.绑定路由监听
$exchangeName = 'trade';
$queue->bind($exchangeName, $routingKey);
// 阻塞状态,有数据时才会执行
$queue->consume(function($envelope, $queue) use ($workerId) {
// ack 应答机制
// 查看那个进程在消费
var_dump($workerId, $envelope->getBody());
$queue->ack($envelope->getDeliveryTag());
});
} catch (Exception $e) {
echo $e->getMessage();
}
});
// 进程关闭
$pool->on('WorkerStop', function($pool, $workerId) {
echo "WorkerId {$workerId} is stoped\n";
});
|