1- 复用对象
stream
.apply(new WindowFunction<WikipediaEditEvent, Tuple2<String, Long>, String, TimeWindow>() {
@Override
public void apply(String userName, TimeWindow timeWindow, Iterable<WikipediaEditEvent> iterable, Collector<Tuple2<String, Long>> collector) throws Exception {
long changesCount = ...
// A new Tuple instance is created on every execution
collector.collect(new Tuple2<>(userName, changesCount));
}
}
上面的代码可以优化为下面的代码:
可以避免Tuple2的重复创建
stream
.apply(new WindowFunction<WikipediaEditEvent, Tuple2<String, Long>, String, TimeWindow>() {
private Tuple2<String, Long> result = new Tuple<>();
@Override
public void apply(String userName, TimeWindow timeWindow, Iterable<WikipediaEditEvent> iterable, Collector<Tuple2<String, Long>> collector) throws Exception {
long changesCount = ...
result.f0 = userName;
result.f1 = changesCount;
collector.collect(result);
}
}
2.数据倾斜
-
rebalance -
自定义分区器 -
key+随机前后缀
3-合理调整并行度
1.ds.writeAsText("data/output/result1").setParallelism(1); #算子层面
2.env.setParallelism(1); #环境层面
3.提交任务时webUI或命令行参数 flink run -p 10 #提交任务时
4.配置文件flink-conf.yaml parallelism.default: 1 #参数 全局
优先级:1-2-3-4
|