由于Kafka是由Java编写的,所以我们需要先安装 java 的 jdk
大致运行原理

1. producer 先从 zookeeper 的 “/brokers/…/state” 节点找到该 partition 的 leader 2. producer 将消息发送给该 leader 3. leader 将消息写?本地 log 4. followers 从 leader pull 消息,写?本地 log 后 leader 发送 ACK(确认字符) 5. leader 收到所有 ISR 中的 replica 的 ACK 后,增加 HW(high watermark,最后 commit 的 offset) 并向 producer 发送 ACK
下载网址
然后下载到任意位置,进行配置环境变量




然后打开命令行进行测试看jdk是否安装成功

在框架中安装Kafka扩展的时候需要在框架中进行一个扩展的安装【注意下载的时候要和我们的PHP版本相对应】
下载扩展包

然后在我们的小皮里面进行配置
1.将上面的第一个文件放入配置文件中

2.然后再将第二个文件放入下面的配置中

3,最后在我们的小皮中进行一个添加,重启小皮

然后我们能够通过phpinfo()查到php中的rdKafka

然后我们进行启动【备注,代码里面&这里代表常连接】
1.zookeeper 启动命令:【单独开启一个服务】
bin/windows/zookeeper-server-start.bat config/zookeeper.properties &
2.kafka 启动命令:【单独开启一个服务】
bin/windows/kafka-server-start.bat config/server.properties &
3.创建topic:【3、4、5在一个窗口下执行】
bin/windows/kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
4.查看创建的topic
bin/windows/kafka-topics.bat --list --bootstrap-server localhost:9092
5.启动生产者 producer
bin/windows/kafka-console-producer.bat --broker-list localhost:9092 --topic test
6.启动消费者 customer【消费者在一个窗口下执行】
bin/windows/kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning




然后在我们的tp6框架中进行composer安装
composer require noname007/kafka-php

然后我们就可以使用代码在框架中进行生产和消费了
use Kafka\lib\ProducerConfig;
public function producer()
{
$config = ProducerConfig::getInstance();
$config->setMetadataRefreshIntervalMs(10000);
$config->setMetadataBrokerList('127.0.0.1:9092');
$config->setBrokerVersion('1.0.0');
$config->setRequiredAck(1);
$config->setIsAsyn(false);
$config->setProduceInterval(500);
$producer = new \Kafka\Producer();
for($i = 0; $i < 100; $i++ ) {
$producer->send([
[
'topic' => 'wxq',
'value' => 'test'.$i,
],
]);
}
}
public function consumer()
{
$config = ProducerConfig::getInstance();
$config->setMetadataRefreshIntervalMs(10000);
$config->setMetadataBrokerList('127.0.0.1:9092');
$config->setGroupId('wxq');
$config->setBrokerVersion('1.0.0');
$config->setTopics(['wxq']);
$consumer = new \Kafka\Consumer();
$consumer->start(function($topic, $part, $message) {
var_dump($message);
});
}
public function consumer()
{
$config = ProducerConfig::getInstance();
$config->setMetadataRefreshIntervalMs(10000);
$config->setMetadataBrokerList('127.0.0.1:9092');
$config->setGroupId('wxq');
$config->setBrokerVersion('1.0.0');
$config->setTopics(['wxq']);
$consumer = new \Kafka\Consumer();
$consumer->start(function($topic, $part, $message) {
var_dump($message);
});
}
|