目录
一:安装libkafka扩展
二:安装rdkafka扩展
三:PHP代码操作链接
四:rdkafka官方文档
需要安装:libkafka,rdkafka扩展
一:安装libkafka扩展
- 下载 去GitHub上:?
git clone https://github.com/edenhill/librdkafka.git - 安装(执行命令)
cd librdkafka/
./configure && make && make install
二:安装rdkafka扩展
- 下载
git clone https://github.com/arnaud-lb/php-rdkafka
cd? php-rdkafka/ ? - 为php安装扩展 在php-rdkafka这个目录下
执行:phpize
然后会生成源代码安装的脚本 把php-config的位置改成自己php-config的位置
执行:./configure --with-php-config=/usr/local/Cellar/php\@7.1/7.1.28_1/bin/php-config(这是我的文件地址)
编译安装(执行该命令)
make && make install
成功后会出现一个文件夹地址: Installing shared extensions:?/usr/local/Cellar/php@7.1/7.1.28_1/pecl/20160303/rdkafka.so?
--->这个地址是需要配置到你的php.ini文件中的 ? - cd?/usr/local/Cellar/php@7.1/7.1.28_1/pecl/20160303/? ?会有个扩展文件:rdkafka.so
? - 修改php.ini文件(添加扩展配置文件)
? - 重启PHP
sudo /usr/local/Cellar/php\@7.1/7.1.28_1/sbin/php-fpm -D?
? - 执行php -m 查看安装已有的扩展
? 执行echo phpinfo(); 也可以查看
三:PHP代码操作链接
try {
//实例化对象
$rcf = new \RdKafka\Conf(); //需要用绝对路径
$rcf->set('group.id', 'leyangjun_test_g');
$cf = new \RdKafka\TopicConf();
/*
$cf->set('offset.store.method', 'file');
*/
$cf->set('auto.offset.reset', 'smallest');
$cf->set('auto.commit.enable', true);
$rk = new \RdKafka\Consumer($rcf);
//设置错误级别
$rk->setLogLevel(LOG_DEBUG);
//设置Broker地址(可以多个,用逗号分隔即可)
$rk->addBrokers("leyangjun.com:9092");
//创建一个主题实例,并且从分区0开始消费
$topic = $rk->newTopic("leyangjun_topic", $cf);
//$topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);
while (true) {
$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);
$info = $topic->consume(0, 1000);
var_dump($info);
if ($msg->err) {
echo $msg->errstr(), "\n";
break;
} else {
echo $msg->payload, "\n";
}
$topic->consumeStop(0);
sleep(1);
}
} catch (Exception $e) {
echo $e->getMessage();
}
https://libraries.io/github/mentionapp/php-rdkafka
|