IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Kafka3.0学习—外部系统集成 -> 正文阅读

[大数据]Kafka3.0学习—外部系统集成

一、集成Flume

Flume经常和Kafka一起使用,即可以作为生产者将数据发送到Kafka中,也可以作为消费者从Kafka中获取数据。
在这里插入图片描述

1.Flume生产者

场景:
使用Flume监控Linux某个文件夹下的文件,然后将监控到的数据发送到Kafka中。
在这里插入图片描述
这里最主要的就是Flume的配置:

# 1 组件定义
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 2 配置 source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/applog/app.*
a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json
# 3 配置 channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 4 配置 sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sinks.k1.kafka.topic = first
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
# 5 拼接组件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

直接在Flume中运行这个配置文件即可,然后在/opt/module/applog目录下对文件新增内容,Kafka就可以接收到消息。

2.Flume消费者

场景:
Kafka中的某个主题需要通过Flume传输到别的地方比如打印到控制台。
在这里插入图片描述
最重要的也是配置Flume:

# 1 组件定义
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 2 配置 source
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 50
a1.sources.r1.batchDurationMillis = 200
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092
a1.sources.r1.kafka.topics = first
a1.sources.r1.kafka.consumer.group.id = custom.g.id
# 3 配置 channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 4 配置 sink
a1.sinks.k1.type = logger

启动Flume和Kafka的生产者,向Kafka中输入数据,就可以在控制台看到打印的数据。

二、集成Flink

在Flink中经常需要和Kafka配合使用,当然Flink也可以作为Kafka的生产者和消费者。

1.Flink生产者

代码如下:

public class FlinkKafkaProducer1 {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(3);

        // 1 读取集合中数据
        ArrayList<String> wordsList = new ArrayList<>();
        wordsList.add("hello");
        wordsList.add("world");
        DataStreamSource<String> streamSource = env.fromCollection(wordsList);

        //2 创建Kafka生产者
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");
        FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<String>("first",new SimpleStringSchema(),properties);

        streamSource.addSink(kafkaProducer);

        env.execute();
    }
}

2.Flink消费者

代码如下:

public class FlinkKafkaConsumer1 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(3);


        //创建Kafka消费者
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");

        FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>("first",new SimpleStringSchema(),properties);

        DataStreamSource<String> streamSource = env.addSource(kafkaConsumer);

        streamSource.print();

        env.execute();
    }
}

这里不写group.id不会报错,最好还是写上。

三、集成SpringBoot

SpringBoot也可以用于Kafka的生产者和消费者。

1.生产者

SpringBoot中使用Kafka模板类KafkaTemplate来向Kafka发送消息,主要代码:

@RestController
public class ProducerController {

    //Kafka模板用来向Kafka发送数据
    @Autowired
    KafkaTemplate<String,String> kafkaTemplate;

    @RequestMapping("/ssl")
    public String date(String msg){
        kafkaTemplate.send("first",msg);
        return "ok";
    }
}

SpringBoot的配置文件如下:

#SpringBoot应用名称
spring.application.name=ssl_springboot_kafka

#Kafka的连接地址
spring.kafka.bootstrap-servers=hadoop102:9092,hadoop103:9092,hadoop104:9092

#Key和Value的序列化方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringDeserializer

2.消费者

SpringBoot主要使用@KafkaListener这个注解来实现对Kafka的消费,主要代码如下:

@Configuration
public class KafkaConsumer {
    
    //指定要坚挺的topic
    @KafkaListener(topics = "first")
    public void consumerTopic(String msg){
        System.out.println("收到的信息: "+msg);
    }
}

SpringBoot消费者配置文件内容如下:

#SpringBoot应用名称
spring.application.name=ssl_springboot_kafka

#Kafka的连接地址
spring.kafka.bootstrap-servers=hadoop102:9092,hadoop103:9092,hadoop104:9092

#Key和Value的反序列化方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

#指定消费者组的id
spring.kafka.consumer.group-id=ssl
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-05-07 11:15:14  更:2022-05-07 11:16:48 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 7:40:43-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码