IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Flink数据分流&Flink与Kafka的连续读写交互 -> 正文阅读

[大数据]Flink数据分流&Flink与Kafka的连续读写交互

Flink的旁路输出特性可以用来对数据进行分流,通过是通过创建一个流的标签(OutputTag),再利用这个OutputTag标签对象作为参数,调用初始/父级数据流的getSideOutput(OutputTag)方法获取子数据流。

由于每个流标签都有一个id,因此不需要创建对象,只要流标签的id相同,其中的数据就相同。因此可以通过匿名内部类的形式来获取子数据流。

例:对初始字符串进行数据分类,将字母、数字、符号分发到不同的子数据流中进行处理。

public class Producer {

    public static void main(String[] args) throws Exception {
        //创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //配置kafka信息
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "10.225.173.107:9092,10.225.173.108:9092,10.225.173.109:9092");
        //创建初始数据
        DataStreamSource<String> source = env.fromElements("n1!u2m%3$b@*4e5r");
        //发送初始数据到kafka
        FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>("test", new SimpleStringSchema(), properties);
        source.addSink(producer);
        //执行程序
        env.execute();
    }
}

 

public class Consumer {

    public static void main(String[] args) throws Exception {
        //创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //配置kafka信息
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "10.225.173.107:9092,10.225.173.108:9092,10.225.173.109:9092");
        //获取初始数据
        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties);
        DataStreamSource<String> source = env.addSource(consumer);
        //对初始数据进行分发,此处用匿名内部类实现,也可以单独创建类写更复杂的逻辑
        SingleOutputStreamOperator<String> process = source.process(new ProcessFunction<String, String>() {
            @Override
            public void processElement(String s, ProcessFunction<String, String>.Context context, Collector<String> collector) {
                StringBuilder num = new StringBuilder();
                StringBuilder word = new StringBuilder();
                StringBuilder other = new StringBuilder();
                for (char c : s.toCharArray()) {
                    if (c >= '0' && c <= '9') {
                        num.append(c);
                    } else if (c >= 'a' && c <= 'z') {
                        word.append(c);
                    } else {
                        other.append(c);
                    }
                }
                //数字发送到id为num的流标签
                context.output(new OutputTag<>("num", Types.STRING), num.toString());
                //字母发送到id为word的流标签
                context.output(new OutputTag<>("word", Types.STRING), word.toString());
                //字符发送到id为other的流标签
                context.output(new OutputTag<>("other", Types.STRING), other.toString());
            }
        });
        process.getSideOutput(new OutputTag<>("num", Types.STRING)).print("这是数字:");
        process.getSideOutput(new OutputTag<>("word", Types.STRING)).print("这是字母:");
        process.getSideOutput(new OutputTag<>("other", Types.STRING)).print("这是字符:");
        //执行程序
        env.execute();
    }
}

先启动Consumer,再启动Producer即可。控制台输出:

这是数字::7> 12345

这是字母::7> number

这是字符::7> !%$@*

例:对Kafka的初始股票字符串进行解析处理,再发送、读取到其他主题。

public class Producer {

    public static void main(String[] args) throws Exception {
        //创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //配置kafka信息
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "10.225.173.107:9092,10.225.173.108:9092,10.225.173.109:9092");
        //创建初始数据
        DataStreamSource<String> source = env.fromElements("name:五粮液,code:000858,price:172");
        //发送初始数据到kafka
        FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>("test", new SimpleStringSchema(), properties);
        source.addSink(producer);
        //执行程序
        env.execute();
    }
}

 

public class Consumer {

    public static void main(String[] args) throws Exception {
        //创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //配置kafka信息
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "10.225.173.107:9092,10.225.173.108:9092,10.225.173.109:9092");
        //获取初始数据
        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties);
        DataStreamSource<String> source = env.addSource(consumer);
        //把股票信息转换为Stock实体类
        SingleOutputStreamOperator<String> process = source.process(new ProcessFunction<String, String>() {
            @Override
            public void processElement(String s, ProcessFunction<String, String>.Context context, Collector<String> collector) {
                Stock stock = new Stock();
                for (String info : s.split(",")) {
                        String[] split = info.split(":");
                        switch (split[0]) {
                            case "name":
                                stock.setName(split[1]);
                                break;
                            case "code":
                                stock.setCode(split[1]);
                                break;
                            case "price":
                                stock.setPrice(Integer.parseInt(split[1]));
                                break;
                        }
                }
                collector.collect(stock.toString());
            }
        });
        //把Stock信息发送到下一个Kafka主题
        FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>("stock", new SimpleStringSchema(), properties);
        process.addSink(producer);
        //从上一个主题获取stock信息
        FlinkKafkaConsumer<String> consumer_1 = new FlinkKafkaConsumer<>("stock", new SimpleStringSchema(), properties);
        env.addSource(consumer_1).print("输出股票信息:");
        //执行程序
        env.execute();
    }

    static class Stock {
        private String name;
        private String code;
        private Integer price;

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public String getCode() {
            return code;
        }

        @Override
        public String toString() {
            return "Stock{" +
                    "name='" + name + '\'' +
                    ", code='" + code + '\'' +
                    ", price=" + price +
                    '}';
        }

        public void setCode(String code) {
            this.code = code;
        }

        public Integer getPrice() {
            return price;
        }

        public void setPrice(Integer price) {
            this.price = price;
        }
    }
}

先启动Consumer,再启动Producer即可。控制台输出:

输出股票信息::3> Stock{name=‘五粮液’, code=‘000858’, price=172}


  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-03-11 22:17:06  更:2022-03-11 22:17:18 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 20:00:59-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码