IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> PHP知识库 -> Laravel 实现 Kafka 消息推送与接收处理 -> 正文阅读

[PHP知识库]Laravel 实现 Kafka 消息推送与接收处理

 "require": {
        "php": ">=7.3",
        "laravel/lumen-framework": "^6.*",
        "nmred/kafka-php": "v0.2.0.8"
    },

?

创建 KafkaService

<?php


namespace App\Services;
use Kafka;

class KafkaService
{
    public function __construct()
    {
        date_default_timezone_set('PRC');
    }

    /*
     * Produce
     */
    public function Producer($topic, $value , $url)
    {
        $config = \Kafka\ProducerConfig::getInstance();
        $config->setMetadataRefreshIntervalMs(10000);
        $config->setMetadataBrokerList($url);
        $config->setBrokerVersion('1.0.0');
        $config->setRequiredAck(1);
        $config->setIsAsyn(false);
        $config->setProduceInterval(500);
        $producer = new \Kafka\Producer(function () use($value,$topic){
            return [
                [
                    'topic' => $topic,
                    'value' => $value,
                    'key' => '',
                ],
            ];
        });
        $producer->success(function ($result){
            return "success";
        });
        $producer->error(function ($errorCode){
            var_dump($errorCode);
        });
        $producer->send(true);
    }

    /*
     * Consumer
     */
    public function consumer($group,$topics , $url){
        $config = \Kafka\ConsumerConfig::getInstance();
        $config->setMetadataRefreshIntervalMs(500);
        $config->setMetadataBrokerList($url);
        $config->setGroupId($group);
        $config->setBrokerVersion('0.9.0.1');
        $config->setTopics([$topics]);
        $config->setOffsetReset('earliest');
        $consumer = new \Kafka\Consumer();
        $consumer->start(function($topic, $part, $message) {
            echo "receive a message:".$message['message']['value']."\n";
            app('consumerKafka')->consumerData($message['message']['value']);//你的接收处理逻辑
            file_put_contents("consumer.log",$message['message']['value']);
        });
    }
}

执行 produce 方法生产消息

<?php


namespace App\Services;
use App\Http\Services\KafkaService;

class ProduceService
{
    public function produce()
    {
        $topic = env('KAFKA_TOPIC'); //配置在env中
        $url = env('KAFKA_URL'); //配置在env中
        $value =
            [
                'code' => 'test',
                'data_type' => 'personal',
                'action' => 'update',
                'data' =>
                    [
                        'id' => 1,
                        'name' => 'tom',
                        'gender' => 2
                    ],
                'redirect_url' => '',
                'operator' => 'system',
            ];
        $value = json_encode ($value, JSON_FORCE_OBJECT );
        $kafka = new KafkaService();
        $kafka->Producer($topic, $value , $url);
    }
}

执行 php artisan consumer:kafka 消费消息

php artisan consumer:kafka
<?php
namespace App\Console\Commands;
use Illuminate\Console\Command;
use Illuminate\Support\Facades\Redis;

class ConsumerKafka extends Command
{
    /**
     * The name and signature of the console command.
     *
     * @var string
     */
    protected $signature = 'consumer:kafka';

    /**
     * The console command description.
     *
     * @var string
     */
    protected $description = '处理异步kafka消息';

    /**
     * Create a new command instance.
     */
    public function __construct()
    {
        parent::__construct();
    }
    /**
     * Execute the console command.
     *
     * @return mixed
     */
    public function handle()
    {
        $this->log('开始监听消息...');
        app('kafkaService')->consumer(
            $group=env('KAFKA_GROUP'),
            $topics =env('KAFKA_TOPIC'),
            $url=env('KAFKA_URL')
        );
        return $this;
    }

    private function log($msg = '')
    {
        if (!$msg) {
            return $this;
        }
        if (php_sapi_name() == 'cli') {
            echo $msg, PHP_EOL;
        }
        file_put_contents("kafka.log",$msg);
        //app('myLog')->lumenLog($msg, 'kafka_consumer');
        return $this;
    }
}

注册config/app.php

 'aliases' => [

        'kafkaService' => App\Services\KafkaService::class,
        'consumerKafka'=>App\Services\ConsumerService::class
]

修改.env

KAFKA_GROUP=192.168.102.46:2181
KAFKA_TOPIC=test
KAFKA_URL=192.168.102.46:9092

kafka部署方法docker部署kafka_飞鱼计划-CSDN博客

  PHP知识库 最新文章
Laravel 下实现 Google 2fa 验证
UUCTF WP
DASCTF10月 web
XAMPP任意命令执行提升权限漏洞(CVE-2020-
[GYCTF2020]Easyphp
iwebsec靶场 代码执行关卡通关笔记
多个线程同步执行,多个线程依次执行,多个
php 没事记录下常用方法 (TP5.1)
php之jwt
2021-09-18
上一篇文章      下一篇文章      查看所有文章
加:2022-01-08 13:45:21  更:2022-01-08 13:46:01 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年12日历 -2024/12/27 5:19:13-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码
数据统计