使用前需要先进行Kafka的配置和Java JDK的安装
参考文档
然后在框架中安装Excel
composer require phpmailer/phpmailer
然后在配置文件中进行公共参数的设置
'email' => [
'CharSet' => 'UTF-8',
'Host' => 'smtp.qq.com',
'SMTPAuth' => true,
'Username' => '1919814727@qq.com',
'Password' => 'izhbczgidupvcfci',
'SMTPSecure' => 'ssl',
'Port' => '465',
],
然后封装公共方法
if(!function_exists('sendEmail')) {
function sendMail($toemails)
{
$toemail = $toemails;
$mail = new PHPMailer\PHPMailer\PHPMailer();
$mail->isSMTP();
$mail->CharSet = \think\facade\Config::get('app.email.CharSet');
$mail->Host = \think\facade\Config::get('app.email.Host');
$mail->SMTPAuth = \think\facade\Config::get('app.email.SMTPAuth');
$mail->Username = \think\facade\Config::get('app.email.Username');
$mail->Password = \think\facade\Config::get('app.email.Password');
$mail->SMTPSecure = \think\facade\Config::get('app.email.SMTPSecure');
$mail->Port = \think\facade\Config::get('app.email.Port');
$mail->setFrom("1919814727@qq.com", "Mailer");
$mail->addAddress($toemail, 'YangShuBin');
$mail->addReplyTo("1919814727@qq.com", "WangXiaoQiang");
$mail->Subject = "这是一个测试邮件";
$mail->Body = "邮件内容是 <b>您的验证码是:123456</b>,哈哈哈!";
if (!$mail->send()) {
echo "Message could not be sent.";
echo "Mailer Error: " . $mail->ErrorInfo;
} else {
echo '发送成功';
}
}
}
启动kafka
输入以下命令
bin/windows/zookeeper-server-start.bat config/zookeeper.properties
再打开一个命令框
bin/windows/kafka-server-start.bat config/server.properties
添加生产者
public function sendEmail()
{
$rk = new \RdKafka\Producer();
$rk->addBrokers("127.0.0.1:9092");
$topic = $rk->newTopic("wxq");
$toemails = [
'YSB'=>'1sfds015@qq.com',
'TYS'=>'g.ysdfsan.s@qq.com',
'TYS1'=>'30473ssdfsf29@qq.com',
'YLY'=>'39sdff28sdfsdf7213@qq.com',
'LGF'=>'84fsd9127@qq.com',
];
foreach($toemails as $k=>$v) {
$topic->produce(RD_KAFKA_PARTITION_UA, 0, $this->sendMails($v));
$rk->poll(0);
sleep(5);
}
while ($rk->getOutQLen() > 0) {
$rk->poll(50);
}
}
public function sendMails($toemails)
{
sendMail($toemails);
}
<?php
$conf = new \RdKafka\Conf();
$conf->set('group.id', 'myConsumerGroup');
$rk = new \RdKafka\Consumer($conf);
$rk->addBrokers("127.0.0.1:9092");
$topicConf = new \RdKafka\TopicConf();
$topicConf->set('auto.commit.interval.ms', 100);
$topicConf->set('offset.store.method', 'file');
$topicConf->set('offset.store.path', sys_get_temp_dir());
$topicConf->set('auto.offset.reset', 'smallest');
$topic = $rk->newTopic("wxq", $topicConf);
$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);
while (true) {
$message = $topic->consume(0, 1 * 10000);
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
var_dump($message);
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
echo "等待接收信息\n";
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
echo "超时\n";
break;
default:
throw new \Exception($message->errstr(), $message->err);
break;
}
}
创建消费者(这个放在原生php文件中就可以,需要使用cli运行模式来运行,否则出不来效果)
|