搭建kafka
1、上传压缩包到任意节点 2、解压,配置环境变量 所有节点都配置
3、修改config/server.properties 1、broker.id=0,每一个节点broker.id 要不一样 2、zookeeper.connect=master:2181,node1:2181,node2:2181 3、log.dirs=/usr/local/soft/kafka_2.11-1.0.0/data 消息存放的位置
4、复制到其它节点 scp -r kafka_2.11-1.0.0 node2:pwd scp -r kafka_2.11-1.0.0 node1:pwd
5、修改每个节点的broker.id master=0 node1=1 node2=2
6、启动(kafka可以不依赖于Hadoop,但是要依赖于zookeeper) 1、启动zookeeper, 需要在所有节点启动 zkServer.sh start
查看状态
zkServer.sh status
3,在每台节点启动broker, kafka是去中心化的架构 -daemon 后台启动 在所有节点启动
kafka-server-start.sh -daemon /usr/local/soft/kafka_2.11-1.0.0/config/server.properties
1、创建topic
–replication-factor —每一个分区的副本数量 –partition --分区数, 根据数据量设置
伪分布式的时候,副本数设置一个就可 kafka-topics.sh --create --zookeeper master:2181 --replication-factor 1 --partitions 3 --topic test_topic1
2、查看topic描述信息 kafka-topics.sh --describe --zookeeper master:2181 --topic test_topic1
3、获取所有topic kafka-topics.sh --list --zookeeper master:2181
4、创建控制台生产者 kafka-console-producer.sh --broker-list master:9092 --topic test_topic1
5、创建控制台消费者 --from-beginning 从头消费,, 如果不在执行消费的新的数据 kafka-console-consumer.sh --bootstrap-server master:9092 --from-beginning --topic test_topic1
重置kafka 1、关闭kafka kill -9
2、删除元数据 zk zkCli.sh 删除预kafka有关的所有信息 ls / rmr /config rmr /brokers
3、删除kafka的数据 所有节点都要删除 rm -rf /usr/local/soft/kafka_2.11-1.0.0/data
4 重启 kafka-server-start.sh -daemon /usr/local/soft/kafka_2.11-1.0.0/config/server.properties
idea中使用flink结合kafka
配置文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>ShuJia01</artifactId>
<groupId>ShuJia</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>kafka</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.11.2</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.12</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
<version>2.11.12</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
<version>2.11.12</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- Java Compiler -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<!-- Scala Compiler -->
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
idea中消费生产者生产的数据
package com.shujia.source
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
object Demo3KafkaProducer {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val properties = new Properties()
properties.setProperty("bootstrap.servers", "192.168.5.201:9092")
properties.setProperty("group.id", "test")
//创建flink kafka 消费者
val flinkKafkaConsumer = new FlinkKafkaConsumer[String]("test_topic1", new SimpleStringSchema(), properties)
// flinkKafkaConsumer.setStartFromEarliest() // 尽可能从最早的记录开始
// flinkKafkaConsumer.setStartFromLatest() // 从最新的记录开始
//flinkKafkaConsumer.setStartFromTimestamp(...) // 从指定的时间开始(毫秒)
/**
* 如果消费者组之前不存在,读取最新的数据
* 如果消费者组已存在,接着之前读取数据
*
*/
flinkKafkaConsumer.setStartFromEarliest() // 默认的方法
val kafkaDS: DataStream[String] = env.addSource(flinkKafkaConsumer)
kafkaDS.print()
env.execute()
}
}
消费学生信息
package com.shujia
import java.util
import java.util.Properties
import org.apache.kafka.clients.consumer.{ConsumerRecord, ConsumerRecords, KafkaConsumer}
object Demo3Comsumer {
def main(args: Array[String]): Unit = {
//1、创建消费者
val properties = new Properties()
//指定kafka的broker的地址
properties.setProperty("bootstrap.servers", "master:9092")
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("group.id", "asdasdd")
/**
* earliest
* 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
* latest
* 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
* none
* topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
*
*/
//从最早读取数据
properties.put("auto.offset.reset", "earliest")
val consumer = new KafkaConsumer[String, String](properties)
println("链接创建成功")
//订阅topic
val topics = new util.ArrayList[String]()
topics.add("student2")
consumer.subscribe(topics)
while (true) {
//消费数据
val records: ConsumerRecords[String, String] = consumer.poll(1000)
println("正在消费数据")
//获取读到的所有数据
val iterator: util.Iterator[ConsumerRecord[String, String]] = records.iterator()
while (iterator.hasNext) {
//获取一行数据
val record: ConsumerRecord[String, String] = iterator.next()
val topic: String = record.topic()
val patition: Int = record.partition()
val offset: Long = record.offset()
val key: String = record.key()
val value: String = record.value()
//默认是系统时间
val ts: Long = record.timestamp()
println(s"$topic\t$patition\t$offset\t$key\t$value\t$ts")
}
}
consumer.close()
}
}
idea中生产数据
package com.shujia
import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
object Demo1kafkaproducer {
def main(args: Array[String]): Unit = {
/**
* 1、创建kfaka链接
* 创建生产者
*/
val properties = new Properties()
//指定kafka的broker的地址
properties.setProperty("bootstrap.servers", "master:9092")
//key和value序列化类
properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
//生产者
val producer = new KafkaProducer[String, String](properties)
//生产数据
//topic 不存在会自动创建一个分区为1副本为1的topic
val record = new ProducerRecord[String, String]("test1", "java")
producer.send(record)
//将数据刷到kafka中
producer.flush()
//关闭链接
producer.close()
}
}
生产学生信息
package com.shujia
import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import scala.io.Source
object Demo2Studentkafka {
def main(args: Array[String]): Unit = {
/**
* 1、创建kfaka链接
* 创建生产者
*
*/
val properties = new Properties()
//指定kafka的broker的地址
properties.setProperty("bootstrap.servers", "master:9092")
//key和value序列化类
properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
//生产者
val producer = new KafkaProducer[String, String](properties)
//读取学生表
Source
.fromFile("data/students.txt")
.getLines()
.foreach(student => {
//将用一个班级的学生打入同一个分区
val clazz: String = student.split(",")(4)
val partition: Int = math.abs(clazz.hashCode) % 2
//将数据发送到kafka
//kafka-topics.sh --create --zookeeper master:2181 --replication-factor 1 --partitions 2 --topic student2
val record = new ProducerRecord[String, String]("student2", partition, null, student)
producer.send(record)
producer.flush()
})
//关闭链接
producer.close()
}
}
感谢阅读,我是啊帅和和,一位大数据专业大四学生,祝你快乐。
|