IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> flink 初识问题总结 -> 正文阅读

[大数据]flink 初识问题总结

package com.test.flink;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class WordCountStreaming {


    public static class  Student {

        String name;
        Integer age;
        String sex;

        public Student(){

        }
        public Student(String name,Integer age ,String sex){
            this.name=name;
            this.age=age;
            this.sex=sex;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public Integer getAge() {
            return age;
        }

        public void setAge(Integer age) {
            this.age = age;
        }

        public String getSex() {
            return sex;
        }

        public void setSex(String sex) {
            this.sex = sex;
        }

    }

    /**
     *

    public static void main(String[] args) throws Exception {

        //步骤一:获取程序入口
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //步骤二:数据的输入
        DataStreamSource<String> dataStream = env.socketTextStream("127.0.0.1", 8888);
        //步骤三:数据的处理
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndone = dataStream.flatMap( new FlatMapFunction<String,
                        Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String line,
                                Collector<Tuple2<String, Integer>> collector) throws Exception {

                String[] fields = line.split(",");
                for (String word : fields) {
                    collector.collect(Tuple2.of(word, 1));
                }

            }
        });

        //2 tuple2
        //3 tuple3
        //4 tuple4
        SingleOutputStreamOperator<Tuple2<String, Integer>> result =
                wordAndone.keyBy(tuple -> tuple.f0)
                        .sum(1); //state

        //步骤四:数据的输出
        result.print();
        //步骤五:任务的启动
        env.execute("WordCount");

    }
     */
   // 2. 复杂代码优化 (将操作封装成类,将操作实例直接当参数传入)
   /** public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> res = env.socketTextStream( "127.0.0.1",8888 );
        SingleOutputStreamOperator<Tuple2<String,Integer>> flatMapRes = res.flatMap( new FlatMap());

        // 把key都拿出来 (只有key的流)
        KeyedStream<Tuple2<String,Integer>,String > keyStream = flatMapRes.keyBy( tuple2 -> tuple2.f0 );
        SingleOutputStreamOperator soutPut = keyStream.sum( 1 );
        System.out.println( keyStream.print( "222") +"========" );
        soutPut.print("111");

      //  flatMapRes.print();
        env.execute("WordCount");

    }
    */

   

   // 3. 复杂代码优化 (将数据字段封装成类)
   public static void main(String[] args) throws Exception {
       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
       DataStreamSource<String> res = env.socketTextStream( "127.0.0.1",8888 );
       SingleOutputStreamOperator<Student> flatMapRes = res.flatMap( new FlatMapOptimize());

       // 把key都拿出来 (只有key的流)
       KeyedStream<Student,String > keyStream = flatMapRes.keyBy( student -> student.name );
       SingleOutputStreamOperator<Student> soutPut = keyStream.sum("age");
       System.out.println( keyStream.print( "222") +"========" );
       soutPut.print("111");

       //  flatMapRes.print();
       env.execute("WordCount");

   }


    //当字段太多时,不能再用Tuple类型的了,将数据字段封装成对象(Student)
   static   class FlatMapOptimize implements FlatMapFunction<String, Student>{

        @Override
        public void flatMap(String value, Collector <Student> out) throws Exception {
            String[] split = value.split( " " );
            for (int i = 0; i < split.length; i++) {
                out.collect( new Student(split[0],Integer.parseInt( split[1] ),split[2]));
            }
        }
    }


    static class FlatMap implements FlatMapFunction<String,Tuple2<String,Integer>>{

        @Override
        public void flatMap(String value, Collector <Tuple2 <String, Integer>> out) throws Exception {
            String[] split = value.split( " " );
            for (int i = 0; i < split.length; i++) {
                out.collect( Tuple2.of( split[i], 1 ));
            }
        }
    }
}


报错:

1.

 /**
     * log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
     * Exception in thread "main" org.apache.flink.api.common.typeutils.CompositeType$InvalidFieldReferenceException: Cannot reference field by field expression on GenericType<com.test.flink.WordCountStreaming.Student>Field expressions are only supported on POJO types, tuples, and case classes. (See the Flink documentation on what is considered a POJO.)
     * 	at org.apache.flink.streaming.util.typeutils.FieldAccessorFactory.getAccessor(FieldAccessorFactory.java:193)
     * 	at org.apache.flink.streaming.api.functions.aggregation.SumAggregator.<init>(SumAggregator.java:55)
     * 	at org.apache.flink.streaming.api.datastream.KeyedStream.sum(KeyedStream.java:781)
     * 	at com.test.flink.WordCountStreaming.main(WordCountStreaming.java:116)
     *
     */

?这种报错 是因为 实例类没有写默认的构造函数; (定义了带参数的构造函数,还要显式定义午餐构造函数)

2.?

222:4> com.test.flink.WordCountStreaming$Student@771a8d6f?

打印出来的是对象引用,可以直接在定义的雷伤重写toString 方法,将字段打印出来;

3.?

//flink 自带的参数读取器
ParameterTool tool =ParameterTool.fromArgs( args );
String host = tool.get( "host" );
System.out.println( host + " ");

idea中调试: 传入参数格式:?--host 127.0.0.1 --port 8888?

4. 开启web界面,方便本地查看,端口8081

  //开启web界面 方便查看结果
       StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
       //StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
     

5.并行度: 默认的并行子任务数是运行程序的电脑的cpu个数。

env.setParallelism( 16 );

前面>输出的编号表示在哪个task中;

?

socketTextStream source 算子并行度是1;?
keyBy 和 sum 作为一个算子,分配task;
当sink 和 前一个算子并行度
并行度和数据传输策略有关系。

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章           查看所有文章
加:2021-08-08 13:36:03  更:2021-08-08 13:36:05 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/17 19:23:49-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码