单Swoole也差不多,就是监听OnPipeMessage事件和sendMessage发送消息。
注册一个监听器监听OnPipeMessage和PipeMessage 事件
use Hyperf\Event\Contract\ListenerInterface;
use Hyperf\Framework\Event\OnPipeMessage;
use Hyperf\Process\Event\PipeMessage as UserProcessPipeMessage;
class OnPipeMessageListener implements ListenerInterface
{
private $container;
public function __construct(ContainerInterface $container)
{
$this->container = $container;
}
public function listen(): array
{
return [
OnPipeMessage::class,
UserProcessPipeMessage::class,
];
}
public function process(object $event)
{
if ($event instanceof OnPipeMessage || $event instanceof UserProcessPipeMessage) {
if($event->data instanceof MyOnPipeMessage){
}
}
}
}
向其他进程发送数据
use Swoole\Server;
use Hyperf\Process\ProcessCollector;
......
$server = $this->container->get(Server::class);
$workerCount = $server->setting['worker_num'] + ($server->setting['task_worker_num'] ?? 0) - 1;
if ($workerCount > 0) {
for ($workerId = 0; $workerId <= $workerCount; ++$workerId) {
if ($server->worker_id != $workerId) {
$server->sendMessage(new MyPipeMessage('message'), $workerId);
}
}
}
if (class_exists(ProcessCollector::class) && !ProcessCollector::isEmpty()) {
$processes = ProcessCollector::all();
if ($processes) {
$string = serialize(new MyPipeMessage('message'));
foreach ($processes as $process) {
$process->exportSocket()->send($string, 10);
}
}
}
|