IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Flink从Kafka读取数据 -> 正文阅读

[大数据]Flink从Kafka读取数据

Flink从Kafka读取数据

1、环境搭建,hadoop集群、zookeeper、kafka

2、IDEA程序编写+配置依赖

在IDEA中添加kafka配置依赖:

Apache Flink 附带了一个通用的 Kafka 连接器,它试图跟踪最新版本的 Kafka 客户端。它使用的客户端版本可能会在 Flink 版本之间发生变化。现代 Kafka 客户端向后兼容代理版本 0.10.0 或更高版本。关于Kafka兼容性的详细信息,请参考Kafka官方文档

<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-connector-kafka_2.11</artifactId>
	<version>1.12.3</version>
</dependency>

IDEA程序编写:

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;

import java.util.Properties;


public class Flink_Source_Kafka {
    public static void main(String[] args) throws Exception {
        //1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //2.从kafka读取数据
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"comexercise");
        DataStreamSource<String> kafkaDS = env.addSource(new FlinkKafkaConsumer<String>("test", new SimpleStringSchema(), properties));
        
        //3.将数据打印
        kafkaDS.print();
        //4.执行任务
        env.execute();

    }
}

3、在kafka中创建一个生产者(producer)

在kafka的bin文件下输入命令:

kafka-console-producer.sh --broker-list hadoop102:9092 --topic test;

4、启动idea程序,在kafka生产者中输入数据

在这里插入图片描述

5、kafka常用命令:

1、查看所有的话题

./kafka-topics.sh --list --zookeeper localhost:9092

2、查看所有话题的详细信息

./kafka-topics.sh --zookeeper localhost:2181 --describe

3、列出指定话题的详细信息

./kafka-topics.sh --zookeeper localhost:2181 --describe  --topic demo

4、删除一个话题

./kafka-topics.sh --zookeeper localhost:2181 --delete  --topic test

5、创建一个叫test的话题,有两个分区,每个分区3个副本

./kafka-topics.sh --zookeeper localhost:2181 --create --topic test --replication-factor 3 --partitions 2

6、测试kafka发送和接收消息(启动两个终端)

#发送消息(注意端口号为配置文件里面的端口号)
./kafka-console-producer.sh --broker-list localhost:9092 --topic test
#消费消息(可能端口号与配置文件保持一致,或与发送端口保持一致)
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning   #加了--from-beginning 重头消费所有的消息
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test         #不加--from-beginning 从最新的一条消息开始消费

7、查看某个topic对应的消息数量

./kafka-run-class.sh  kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic test --time -1

8、显示所有消费者

./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

9、获取正在消费的topic(console-consumer-63307)的group的offset

./kafka-consumer-groups.sh --describe --group console-consumer-63307 --bootstrap-server localhost:9092

10、显示消费者

./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
--bootstrap-server localhost:9092

10、显示消费者

./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-23 16:45:39  更:2021-08-23 16:47:06 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/23 12:50:37-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码