package com.test.flink;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class WordCountStreaming {
public static class Student {
String name;
Integer age;
String sex;
public Student(){
}
public Student(String name,Integer age ,String sex){
this.name=name;
this.age=age;
this.sex=sex;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Integer getAge() {
return age;
}
public void setAge(Integer age) {
this.age = age;
}
public String getSex() {
return sex;
}
public void setSex(String sex) {
this.sex = sex;
}
}
/**
*
public static void main(String[] args) throws Exception {
//步骤一:获取程序入口
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//步骤二:数据的输入
DataStreamSource<String> dataStream = env.socketTextStream("127.0.0.1", 8888);
//步骤三:数据的处理
SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndone = dataStream.flatMap( new FlatMapFunction<String,
Tuple2<String, Integer>>() {
@Override
public void flatMap(String line,
Collector<Tuple2<String, Integer>> collector) throws Exception {
String[] fields = line.split(",");
for (String word : fields) {
collector.collect(Tuple2.of(word, 1));
}
}
});
//2 tuple2
//3 tuple3
//4 tuple4
SingleOutputStreamOperator<Tuple2<String, Integer>> result =
wordAndone.keyBy(tuple -> tuple.f0)
.sum(1); //state
//步骤四:数据的输出
result.print();
//步骤五:任务的启动
env.execute("WordCount");
}
*/
// 2. 复杂代码优化 (将操作封装成类,将操作实例直接当参数传入)
/** public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> res = env.socketTextStream( "127.0.0.1",8888 );
SingleOutputStreamOperator<Tuple2<String,Integer>> flatMapRes = res.flatMap( new FlatMap());
// 把key都拿出来 (只有key的流)
KeyedStream<Tuple2<String,Integer>,String > keyStream = flatMapRes.keyBy( tuple2 -> tuple2.f0 );
SingleOutputStreamOperator soutPut = keyStream.sum( 1 );
System.out.println( keyStream.print( "222") +"========" );
soutPut.print("111");
// flatMapRes.print();
env.execute("WordCount");
}
*/
// 3. 复杂代码优化 (将数据字段封装成类)
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> res = env.socketTextStream( "127.0.0.1",8888 );
SingleOutputStreamOperator<Student> flatMapRes = res.flatMap( new FlatMapOptimize());
// 把key都拿出来 (只有key的流)
KeyedStream<Student,String > keyStream = flatMapRes.keyBy( student -> student.name );
SingleOutputStreamOperator<Student> soutPut = keyStream.sum("age");
System.out.println( keyStream.print( "222") +"========" );
soutPut.print("111");
// flatMapRes.print();
env.execute("WordCount");
}
//当字段太多时,不能再用Tuple类型的了,将数据字段封装成对象(Student)
static class FlatMapOptimize implements FlatMapFunction<String, Student>{
@Override
public void flatMap(String value, Collector <Student> out) throws Exception {
String[] split = value.split( " " );
for (int i = 0; i < split.length; i++) {
out.collect( new Student(split[0],Integer.parseInt( split[1] ),split[2]));
}
}
}
static class FlatMap implements FlatMapFunction<String,Tuple2<String,Integer>>{
@Override
public void flatMap(String value, Collector <Tuple2 <String, Integer>> out) throws Exception {
String[] split = value.split( " " );
for (int i = 0; i < split.length; i++) {
out.collect( Tuple2.of( split[i], 1 ));
}
}
}
}
报错:
1.
/**
* log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
* Exception in thread "main" org.apache.flink.api.common.typeutils.CompositeType$InvalidFieldReferenceException: Cannot reference field by field expression on GenericType<com.test.flink.WordCountStreaming.Student>Field expressions are only supported on POJO types, tuples, and case classes. (See the Flink documentation on what is considered a POJO.)
* at org.apache.flink.streaming.util.typeutils.FieldAccessorFactory.getAccessor(FieldAccessorFactory.java:193)
* at org.apache.flink.streaming.api.functions.aggregation.SumAggregator.<init>(SumAggregator.java:55)
* at org.apache.flink.streaming.api.datastream.KeyedStream.sum(KeyedStream.java:781)
* at com.test.flink.WordCountStreaming.main(WordCountStreaming.java:116)
*
*/
?这种报错 是因为 实例类没有写默认的构造函数; (定义了带参数的构造函数,还要显式定义午餐构造函数)
2.?
222:4> com.test.flink.WordCountStreaming$Student@771a8d6f?
打印出来的是对象引用,可以直接在定义的雷伤重写toString 方法,将字段打印出来;
3.?
//flink 自带的参数读取器
ParameterTool tool =ParameterTool.fromArgs( args );
String host = tool.get( "host" );
System.out.println( host + " ");
idea中调试: 传入参数格式:?--host 127.0.0.1 --port 8888?
4. 开启web界面,方便本地查看,端口8081
//开启web界面 方便查看结果
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
//StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
5.并行度: 默认的并行子任务数是运行程序的电脑的cpu个数。
env.setParallelism( 16 );

前面>输出的编号表示在哪个task中;
?
socketTextStream source 算子并行度是1;?
keyBy 和 sum 作为一个算子,分配task;
当sink 和 前一个算子并行度
并行度和数据传输策略有关系。
|