IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> storm迁移flink研究——如何利用Flink对数据进行分流,按自定义逻辑分阶连续处理 -> 正文阅读

[大数据]storm迁移flink研究——如何利用Flink对数据进行分流,按自定义逻辑分阶连续处理

Storm迁移flink主要问题:

  1. Storm通过自定义的Bolt类实现自己的业务逻辑,如何在flink中实现

通过flink的ProcessFuction类实现,可以通过继承该类,在processElement方法中实现自己的业务逻辑。

  1. Storm按照业务类型分发数据处理的逻辑,如何在flink中实现

通过flink的旁路输出特性实现,对原始的数据流按照某些分类标准分类,输出到不同的子数据流中处理。

总体处理流程:

  1. Flink从Kafka中读取数据,作为初始数据流initDataStream;

  2. 对初始数据流进行分类处理,将子数据输出到数据流dataStream1、dataStream2、…、dataStreamn中;

  3. 按照自己的业务逻辑,对子数据流dataStream1_1、dataStream2_1、…、dataStreamn_1处理,得到dataStream1_2、dataStream2_2、…、dataStreamn_2,经过多次处理,最终得到dataStream1_x、dataStream2_y、…、dataStreamn_z

  4. 最后利用SinkFuction进行落地处理

例:利用flink对北京、上海两城市民信息进行过滤,过滤掉工资小于1000和大于20000的数据,然后按姓名_城市_收入的格式输出

输入:个人信息{姓名、年龄、性别、城市、收入}

输出:姓名_城市_收入,如:a_b_100

Person类:

public class Person {

    private String name;
    private Integer age;
    private Sex sex;
    private Integer salary;
    private Country country;
    //getter、setter...
}

enum Sex {
    Male,
    Female
}

enum Country {
    Beijing,
    Shanghai
}

生产者程序:

public class FlinkProducer {

    //kafka配置文件
    private static final Properties properties;

    static {
        //将kafka地址放入配置文件
        properties = new Properties();
        properties.put("bootstrap.servers", "10.225.173.107:9092,10.225.173.108:9092,10.225.173.109:9092");
    }

    public static void main(String[] args) throws Exception {
        //创建flink环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //连接kafka
        FlinkKafkaProducer<String> flinkKafkaProducer = new FlinkKafkaProducer<>("flink", new SimpleStringSchema(), properties);
        //初始化数据源
        String[] initData = getData();
        DataStreamSource<String> source = env.fromElements(initData);
        //发送数据
        source.addSink(flinkKafkaProducer);
        //执行flink程序
        env.execute();
    }

    //模拟数据源
    private static String[] getData() {
        String[] data = new String[100];
        for (int i = 0; i < 100; i++) {
            Person person = new Person();
            person.setName("name");
            person.setAge(18);
            //设置性别
            int sex = (int) (Math.random() * 100);
            if (sex % 2 == 0) {
                person.setSex(Sex.Female);
            } else {
                person.setSex(Sex.Male);
            }
            //设置城市
            int country = (int) (Math.random() * 100);
            if (country % 2 == 0) {
                person.setCountry(Country.Beijing);
            } else {
                person.setCountry(Country.Shanghai);
            }
            //设置收入
            person.setSalary((int) (Math.random() * 50000));
            data[i] = JsonUtils.serialize(person);
        }
        return data;
    }

}

消费者程序:

public class FlinkConsumer {

    //kafka配置文件
    private static final Properties properties;

    //男性标签
    private static final OutputTag<String> MALE = new OutputTag<>("male", Types.STRING);

    //女性标签
    private static final OutputTag<String> FEMALE = new OutputTag<>("female", Types.STRING);

    static {
        //将kafka地址放入配置文件
        properties = new Properties();
        properties.put("bootstrap.servers", "10.225.173.107:9092,10.225.173.108:9092,10.225.173.109:9092");
    }

    public static void main(String[] args) throws Exception {
        //1.创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.flink连接kafka并读取数据源
        FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer<>("flink", new SimpleStringSchema(), properties);
        //3.获取初始数据源
        DataStreamSource<String> initDataStream = env.addSource(flinkKafkaConsumer);
        //4.对数据源进行分类处理
        SingleOutputStreamOperator<String> process = initDataStream.process(new SexSplitBolt());
        //5.分别获取仅仅包含男性、女性数据的数据流
        DataStream<String> dataStream_male = process.getSideOutput(MALE);
        DataStream<String> dataStream_female = process.getSideOutput(FEMALE);
        //6.过滤掉工资小于1000和大于20000的数据
        SingleOutputStreamOperator<String> dataStream_male_1 = dataStream_male.process(new SalarySplitBolt());
        SingleOutputStreamOperator<String> dataStream_female_1 = dataStream_female.process(new SalarySplitBolt());
        //7.给每个人的工资增加20%
        SingleOutputStreamOperator<String> dataStream_male_1_2 = dataStream_male_1.process(new SalaryAddBolt());
        SingleOutputStreamOperator<String> dataStream_female_1_2 = dataStream_female_1.process(new SalaryAddBolt());
        //8.进行落地处理,输出处理结果到控制台中
        dataStream_male_1_2.addSink(new OutputBolt());
        dataStream_female_1_2.addSink(new OutputBolt());
        //9.执行flink程序
        env.execute();
    }

    static class SexSplitBolt extends ProcessFunction<String, String> {

        @Override
        public void processElement(String s, ProcessFunction<String, String>.Context context, Collector<String> collector) {
            Person person = JsonUtils.deserialize(s, Person.class);
            if (person.getSex() == Sex.Male) {
                context.output(MALE, s);
            } else {
                context.output(FEMALE, s);
            }
        }
    }

    static class SalarySplitBolt extends ProcessFunction<String, String> {

        @Override
        public void processElement(String s, ProcessFunction<String, String>.Context context, Collector<String> collector) {
            Person person = JsonUtils.deserialize(s, Person.class);
            if (person.getSalary() < 1000 || person.getSalary() > 20000) {
                return;
            }
            collector.collect(s);
        }
    }

    static class SalaryAddBolt extends ProcessFunction<String, String> {

        @Override
        public void processElement(String s, ProcessFunction<String, String>.Context context, Collector<String> collector) {
            Person person = JsonUtils.deserialize(s, Person.class);
            person.setSalary((int) (person.getSalary() * 1.2));
            collector.collect(s);
        }
    }

    static class OutputBolt implements SinkFunction<String> {
        @Override
        public void invoke(String value, Context context) {
            Person person = JsonUtils.deserialize(value, Person.class);
            System.out.println("已处理数据:" + person.getName() + "_" + person.getCountry() + "_" + person.getSalary());
        }
    }
   
}

控制台输出:
已处理数据:name_Beijing_9320

已处理数据:name_Beijing_1049

已处理数据:name_Beijing_11548

已处理数据:name_Shanghai_16721

已处理数据:name_Beijing_3638

已处理数据:name_Shanghai_18862

已处理数据:name_Shanghai_8183

已处理数据:name_Shanghai_18627

已处理数据:name_Shanghai_16510

已处理数据:name_Beijing_18983

已处理数据:name_Beijing_10734

已处理数据:name_Shanghai_1594

已处理数据:name_Beijing_3640

已处理数据:name_Shanghai_17757

已处理数据:name_Shanghai_16700

已处理数据:name_Beijing_1175

已处理数据:name_Beijing_11788

已处理数据:name_Shanghai_15732

已处理数据:name_Beijing_15137

已处理数据:name_Beijing_12853

已处理数据:name_Beijing_11548

已处理数据:name_Shanghai_1188

已处理数据:name_Shanghai_3202

已处理数据:name_Shanghai_3986

已处理数据:name_Shanghai_8193

已处理数据:name_Beijing_16093

已处理数据:name_Beijing_8274

已处理数据:name_Beijing_3630

已处理数据:name_Beijing_11962

已处理数据:name_Beijing_14441

已处理数据:name_Shanghai_5312

已处理数据:name_Beijing_1815

已处理数据:name_Shanghai_11287

已处理数据:name_Beijing_5848

已处理数据:name_Beijing_11056

已处理数据:name_Shanghai_12407

已处理数据:name_Shanghai_12652

已处理数据:name_Beijing_19864

已处理数据:name_Shanghai_18935

已处理数据:name_Shanghai_16894

已处理数据:name_Shanghai_9732

36570 [TaskExecutorLocalStateStoresManager shutdown hook] INFO o.a.f.r.s.TaskExecutorLocalStateStoresManager - Shutting down TaskExecutorLocalStateStoresManager.


  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-03-04 15:39:43  更:2022-03-04 15:41:10 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 21:12:34-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码