? ? ? ? 本人实测环境:centos7.3+zookeeper+KAFKA(JDK自行安装1.8),废话少说,直接开始。
一、zookeeper安装部署(附件内附:zookeeper-3.4.11.tar.gz)
#下载zookeeper
wget http://mirror.bit.edu.cn/apache/zookeeper/stable/zookeeper-3.4.11.tar.gz
#解压
tar -zxcf zookeeper-3.4.11.tar.gz
#拷贝至/usr/local/目录
mv zookeeper-3.4.11/ /usr/local/
#重命名文件夹
mv /usr/local/zookeeper-3.4.11/ /usr/local/zookeeper/
#创建目录
mkdir /usr/local/zookeeper/data/
#设置配置文件
cp /usr/local/zookeeper/conf/zoo_sample.cfg /usr/local/zookeeper/conf/zoo.cfg
????????修改配置文件:/usr/local/zookeeper/conf/zoo.cfg,添加一行:dataDir=/usr/local/zookeeper/data,整体配置如下:
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/local/zookeeper/data
clientPort=2181
? ? ? ? 使用如下命令对zookeeper进行启动,查看其状态。
#启动zookeeper
/usr/local/zookeeper/bin/zkServer.sh start
#查看状态
/usr/local/zookeeper/bin/zkServer.sh status
????????启动成功后,查看状态结果如下:
二、KAFKA安装部署(附件内附:kafka_2.10-0.9.0.0.tgz)
#下载KAFKA
wget https://archive.apache.org/dist/kafka/0.9.0.0/kafka_2.10-0.9.0.0.tgz
#解压KAFKA
tar -zxvf kafka_2.10-0.9.0.0.tgz
#拷贝到/usr/local目录
cp -R kafka_2.10-0.9.0.0 /usr/local
#重命名
mv /usr/local/kafka_2.10-0.9.0.0/ /usr/local/kafka/
????????编辑/usr/local/kafka/config/server.properties配置文件,其他配置不变,保证具备如下配置参数:
broker.id=0
host.name=本机IP
listeners=PLAINTEXT://:9092
#方便KAFKA远程访问
advertised.listeners=PLAINTEXT://本机IP:9092
advertised.host.name=localhost
zookeeper.connect=localhost:2181
????????编辑/usr/local/kafka/bin/kafka-server-start.sh文件,修改如下配置(如果虚拟机内存足够,不必做此操作)。
export KAFKA_HEAP_OPTS="-Xmx256M -Xms128M"
? ? ? ? 启动kafka,创建topic,并进行测试。
#启动Kafka
/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
#创建topic(名称为test)
/usr/local/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
#发送数据(生产者)
/usr/local/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
#接受数据(消费者)
/usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic test
三、JAVA代码(Flink实时读取Kafka数据,定时批量聚合写入Mysql,附件内附源码)
? ? ? ? (1)Entity学生类。
public class Student {
public int id;
public String name;
public String password;
public int age;
public Student() {
}
public Student(int id, String name, String password, int age) {
this.id = id;
this.name = name;
this.password = password;
this.age = age;
}
@Override
public String toString() {
return "Student{" +
"id=" + id +
", name='" + name + '\'' +
", password='" + password + '\'' +
", age=" + age +
'}';
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
}
????????(2)主函数类Main2。(按标注自行替换参数)
public class Main2 {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties props = new Properties();
props.put("bootstrap.servers", "zookeeper所在IP:9092");
props.put("zookeeper.connect", "zookeeper所在IP:2181");
//可使用命令查看
// ./kafka-consumer-groups.sh --zookeeper localhost:2181 --list
props.put("group.id", "console-consumer-91899");//请自行替换
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "latest");
SingleOutputStreamOperator<Student> student = env.addSource(new FlinkKafkaConsumer09<>(
"test",
new SimpleStringSchema(),
props)).setParallelism(1)
.map(string -> JSON.parseObject(string, Student.class));
//从kafka接受数据,对1min内的数据做聚合
student.timeWindowAll(Time.minutes(1)).apply(new AllWindowFunction<Student, List<Student>, TimeWindow>() {
@Override
public void apply(TimeWindow window, Iterable<Student> values, Collector<List<Student>> out) throws Exception {
ArrayList<Student> students = Lists.newArrayList(values);
if (students.size() > 0) {
System.out.println("1 分钟内收集到 student 的数据条数是:" + students.size());
out.collect(students);
}
}
}).addSink(new SinkToMySQL());
env.execute("Flink add sink");
}
}
? ? ? ? (3)SinkToMySQL类(将批量数据写入MySQL,按标注自行替换参数)
public class SinkToMySQL extends RichSinkFunction<List<Student>> {
PreparedStatement ps;
BasicDataSource dataSource;
private Connection connection;
/**
* open() 方法中建立连接,这样不用每次 invoke 的时候都要建立连接和释放连接
*
* @param parameters
* @throws Exception
*/
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
dataSource = new BasicDataSource();
connection = getConnection(dataSource);
String sql = "insert into Student(id, name, password, age) values(?, ?, ?, ?);";
ps = this.connection.prepareStatement(sql);
}
@Override
public void close() throws Exception {
super.close();
//关闭连接和释放资源
if (connection != null) {
connection.close();
}
if (ps != null) {
ps.close();
}
}
/**
* 每条数据的插入都要调用一次 invoke() 方法
*
* @param value
* @param context
* @throws Exception
*/
@Override
public void invoke(List<Student> value, Context context) throws Exception {
//遍历数据集合
for (Student student : value) {
ps.setInt(1, student.getId());
ps.setString(2, student.getName());
ps.setString(3, student.getPassword());
ps.setInt(4, student.getAge());
ps.addBatch();
}
int[] count = ps.executeBatch();//批量后执行
System.out.println("成功了插入了" + count.length + "行数据");
}
private static Connection getConnection(BasicDataSource dataSource) {
dataSource.setDriverClassName("com.mysql.jdbc.Driver");
//注意,替换成自己本地的 mysql 数据库地址和用户名、密码
dataSource.setUrl("jdbc:mysql://localhost:3306/数据库名");
dataSource.setUsername("数据库用户名");
dataSource.setPassword("数据库密码");
//设置连接池的一些参数
dataSource.setInitialSize(10);
dataSource.setMaxTotal(50);
dataSource.setMinIdle(2);
Connection con = null;
try {
con = dataSource.getConnection();
System.out.println("创建连接池:" + con);
} catch (Exception e) {
System.out.println("-----------mysql get connection has exception , msg = " + e.getMessage());
}
return con;
}
}
? ? ? ? (4)KafkaDataMonitor类(模拟向kafka发送数据,按标注自行替换参数)
public class KafkaDataMonitor {
public static final String broker_list = "zookeeper所在IP:9092";
public static final String topic = "test";//此处替换为zookeeper的topic
public static void main(String[] args) throws InterruptedException {
writeToKafka();
}
public static void writeToKafka() throws InterruptedException {
Properties props = new Properties();
props.put("bootstrap.servers", broker_list);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer producer = new KafkaProducer<String, String>(props);
for (int i = 1; i <= 20; i++) {
Student student = new Student(i, "test" + i, "password" + i, 18 + i);
ProducerRecord record = new ProducerRecord<String, String>(topic, null, null, JSON.toJSONString(student));
producer.send(record);
System.out.println("send data: " + JSON.toJSONString(student));
}
producer.flush();
}
}
? ? ? ? (5)我们启动主函数类(Main2)的main方法,待启动成功后。然后,使用模拟器,启动KafkaDataMonitor类的main方法,模拟向KAFKA中写入数据。日志打印如下:
? ? ? ?使用navicat工具连接对应mysql数据库,我们可以看到1min内写入kafka的20条模拟数据,均已成功写入Mysql。
? ? ? ? 附:源码+配套环境部署
|