一、背景
? ? ? ? flink做etl、大宽表、统计过程中有些小细节可以尝试优化,这里简单记录下:
二、场景
? ? ? ? 2.1 允许延迟的数据同步。比如线上订单库binlog同步到查询库,或者简单处理进入分析库,让分析师直接查询明细. 如果吞吐要求大一点,可以开启小批处理。
????????????????
# 这是SQL 参数,具体的根据业务自己调节
table.exec.mini-batch.enabled: 'true'
table.exec.mini-batch.size: '2000'
table.exec.mini-batch.binary.memory.size: 1mb
? ? ? ? 2.2 同 2.1类似,适合短时间高频率更新的业务,可以根据window进行过滤,减少存储端的更新频率。同样这种方法能减少下游维表的 join 的压力,相当于N秒更新的数据,内存压缩到1条
# 定义带自己的时间戳
CREATE TEMPORARY TABLE source_table (
id bigint,
name varchar,
ts AS PROCTIME()
);
# 这里用window 减少数据量
# 顺序可以用last_value,乱序用row_number + id + 时间处理
select
id,
last_value(name) as name
GROUP BY id,TUMBLE(ts, INTERVAL '5' SECOND);
2.3? 有些场景凌晨的时候partition 没数据过来,导致开启window的时候下游无法输出,从而导致凌晨后计算,数据不全
# 某些source partition没数据,等待N秒,忽略它,让数据继续往下
# 参考:WatermarkOutputMultiplexer 和 WatermarksWithIdleness 这两个类的 onPeriodicEmit 方法
table.exec.source.idle-timeout : 3s
2.4 如果用了group by 等场景,别忘记开启TTL,避免无限膨胀
table.exec.state.ttl: ....
2.5 由于opreator chain 优化,导致我们flinkUI 查询流量以及产生背压的情况不好找原因。可以临时开启:
# 会让算子不合并,吃N多资源,但是方便监控流量,以及算子瓶颈
# 由于SQL 算子的并行度调节目前1.12 没有做,所有一般是调大并行度解决
pipeline.operator-chaining: true
2.6 top-n 优化
? ? ? a.如果类目不多,可以直接count? group by 类目,累加到类似数据库存储处理
? ? ? ? ?更新频率比较高,同理可以用小batch 减少更新。
? ? ? b. 如果类目较多(比如100W+),?SQL有过优化了, 里面只是状态会维护得比较大一点
? ? ? c.如果类目及其多(1亿+),可以自定义一个累加算子,然后存储接分布式的高性能库、或者redis、ES都行.?参考微博的top排行。
2.7? ?join 优化
1.如果是多维表、同一个数据库、主键join。开发一个单独的join插件,可以传入SQL 一次? join多条。
2. join 比较新的数据,高频数据,最好接入一个公共服务,有服务提供公共层、缓存、更新等操作。同时也能减少对数据库的连接
小结:
? ? ? ?这里仅记录分享部分优化过程,希望对新入门的有一定借鉴作用。
? ? ? ?其他更多调优得看文档和里面代码了。特殊逻辑最好自己实现比较优秀。
|