| |
|
开发:
C++知识库
Java知识库
JavaScript
Python
PHP知识库
人工智能
区块链
大数据
移动开发
嵌入式
开发工具
数据结构与算法
开发测试
游戏开发
网络协议
系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程 数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁 |
-> 大数据 -> Flinkx -> 正文阅读 |
|
[大数据]Flinkx |
架构
支持任务类型
基本原理?
部署
任务配置
{ "job": { "content": [ { "reader": { "parameter": { "username": "root", "password": "123456", "fetchSize": 10, "connection": [{ "jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/rpl?useUnicode=true&characterEncoding=utf8"], "table": ["flink_test"] }], "column": ["id","name","age"], "customSql": "", "where": "id > 0", "splitPk": "id", "startLocation": "20", "queryTimeOut": 1000, "requestAccumulatorInterval": 2 }, "name": "mysqlreader" }, "writer": { "name": "mysqlwriter", "parameter": { "username": "root", "password": "123456", "connection": [ { "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/rpl?useUnicode=true&characterEncoding=utf8", "table": [ "flink_test_dst" ] } ], // "preSql": ["truncate table flink_test_dst;"], "postSql": [], "writeMode": "replace", "column": [ { "name": "id", "type": "BIGINT" }, { "name": "name", "type": "varchar" }, { "name": "age", "type": "INT" } ] } } } ], "setting": { "speed": { // "channel": 1, "readerChannel": 3, "writerChannel": 3, "bytes": 0 }, "errorLimit": { "record": 100 }, "restore": { "maxRowNumForCheckpoint": 10, "isRestore": true, "restoreColumnName": "id", "restoreColumnIndex": 0 }, "log" : { "isLogger": false, "level" : "debug", "path" : "", "pattern":"" } } } } Mysql To Mysql 全量
Reader拉取数据的 sqlselect * from data_test where id mod ${channel_num}=${channel_index} # and id > ${offset} 如果是第一次运行,则此条件不存在 # order by id 要求 id 必须升序排列,故此条件不需要 /** * 构造查询sql * * @param inputSplit 数据切片 * @return 构建的sql字符串 */ protected String buildQuerySql(InputSplit inputSplit) { //QuerySqlBuilder中构建的queryTemplate String querySql = queryTemplate; if (inputSplit == null) { LOG.warn("inputSplit = null, Executing sql is: '{}'", querySql); return querySql; } JdbcInputSplit jdbcInputSplit = (JdbcInputSplit) inputSplit; if (StringUtils.isNotEmpty(splitKey)) { querySql = queryTemplate.replace("${N}", String.valueOf(numPartitions)).replace("${M}", String.valueOf(indexOfSubTask)); } //是否开启断点续传 if (restoreConfig.isRestore()) { if (formatState == null) { querySql = querySql.replace(DbUtil.RESTORE_FILTER_PLACEHOLDER, StringUtils.EMPTY); if (incrementConfig.isIncrement()) { querySql = buildIncrementSql(jdbcInputSplit, querySql); } } else { boolean useMaxFunc = incrementConfig.isUseMaxFunc(); String startLocation = getLocation(restoreColumn.getType(), formatState.getState()); if (StringUtils.isNotBlank(startLocation)) { LOG.info("update startLocation, before = {}, after = {}", jdbcInputSplit.getStartLocation(), startLocation); jdbcInputSplit.setStartLocation(startLocation); useMaxFunc = false; } String restoreFilter = buildIncrementFilter(restoreColumn.getType(), restoreColumn.getName(), jdbcInputSplit.getStartLocation(), jdbcInputSplit.getEndLocation(), customSql, useMaxFunc); if (StringUtils.isNotEmpty(restoreFilter)) { restoreFilter = " and " + restoreFilter; } querySql = querySql.replace(DbUtil.RESTORE_FILTER_PLACEHOLDER, restoreFilter); } querySql = querySql.replace(DbUtil.INCREMENT_FILTER_PLACEHOLDER, StringUtils.EMPTY); } else if (incrementConfig.isIncrement()) { querySql = buildIncrementSql(jdbcInputSplit, querySql); } LOG.warn("Executing sql is: '{}'", querySql); return querySql; } Writer
@Override protected void writeMultipleRecordsInternal() throws Exception { try { for (Row row : rows) { for (int index = 0; index < row.getArity(); index++) { preparedStatement.setObject(index+1, getField(row, index)); } preparedStatement.addBatch(); if (restoreConfig.isRestore()) { if (lastRow != null){ readyCheckpoint = !ObjectUtils.equals(lastRow.getField(restoreConfig.getRestoreColumnIndex()), row.getField(restoreConfig.getRestoreColumnIndex())); } lastRow = row; } } preparedStatement.executeBatch(); if(restoreConfig.isRestore()){ rowsOfCurrentTransaction += rows.size(); }else{ //手动提交事务 DbUtil.commit(dbConn); } preparedStatement.clearBatch(); } catch (Exception e){ LOG.warn("write Multiple Records error, row size = {}, first row = {}, e = {}", rows.size(), rows.size() > 0 ? GsonUtil.GSON.toJson(rows.get(0)) : "null", ExceptionUtil.getErrorMessage(e)); LOG.warn("error to writeMultipleRecords, start to rollback connection, e = {}", ExceptionUtil.getErrorMessage(e)); DbUtil.rollBack(dbConn); throw e; }finally { //执行完后清空batch preparedStatement.clearBatch(); } } 位点
Mysql 增量
Reader 给 Writer 的数据结构
{schema=rpl, after_id=222222, type=INSERT, after_value=2222222, table=t1, ts=6778672092931035136} {schema=rpl, type=DELETE, table=t1, before_value=2222222, ts=6778672106155675648, before_id=222222} {schema=rpl, after_id=222222, type=UPDATE, after_value=1, table=t1, before_value=2222222, ts=6778673333039927296, before_id=222222} 位点(checkpoint)
com/dtstack/flinkx/binlog/format/BinlogInputFormat.java @Override public FormatState getFormatState() { if (!restoreConfig.isRestore()) { LOG.info("return null for formatState"); return null; } super.getFormatState(); if (formatState != null) { formatState.setState(entryPosition); } return formatState; } 并发Flink 任务调度
? Flink 并行度与任务链
? 多 channel并发层级
"speed": { "channel": 1, "readerChannel": 3, "writerChannel": 3, "bytes": 0 } Reader Writer channel 数不同
? Reader Writer channel 数相同
? 增量并发
限速
"speed": { "channel": 1, "bytes": 0 }, 失败恢复Flink 的故障恢复checkpoint
? ?
? 恢复
这样的机制可以保证 Flink 内部状态的 Excatly-Once 一致性。至于端到端的Exactly-Once一致性,要根据Source和Sink的具体实现而定。当发生故障时,一部分数据有可能已经流入系统,但还未进行Checkpoint,Source的Checkpoint记录了输入的Offset;当重启时,Flink能把最近一次的Checkpoint恢复到内存中,并根据Offset,让Source从该位置重新发送一遍数据,以保证数据不丢不重。像Kafka等消息队列是提供重发功能的,socketTextStream就不具有这种功能,也意味着不能保证Exactly-Once投递保障。 Exactly-Once故障发生 ? 事务写
?
Flinkx 的快照
例子:Mysql To HDFS
? 触发 checkpoint 后:
Flinkx 的故障恢复
资源管理调度
? https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java @Override public CompletableFuture<Acknowledge> requestSlot( JobMasterId jobMasterId, SlotRequest slotRequest, final Time timeout) { JobID jobId = slotRequest.getJobId(); JobManagerRegistration jobManagerRegistration = jobManagerRegistrations.get(jobId); if (null != jobManagerRegistration) { if (Objects.equals(jobMasterId, jobManagerRegistration.getJobMasterId())) { log.info( "Request slot with profile {} for job {} with allocation id {}.", slotRequest.getResourceProfile(), slotRequest.getJobId(), slotRequest.getAllocationId()); try { slotManager.registerSlotRequest(slotRequest); } catch (ResourceManagerException e) { return FutureUtils.completedExceptionally(e); } return CompletableFuture.completedFuture(Acknowledge.get()); } else { return FutureUtils.completedExceptionally( new ResourceManagerException( "The job leader's id " + jobManagerRegistration.getJobMasterId() + " does not match the received id " + jobMasterId + '.')); } } else { return FutureUtils.completedExceptionally( new ResourceManagerException( "Could not find registered job manager for job " + jobId + '.')); } } https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java /** * Requests a slot with the respective resource profile. * * @param slotRequest specifying the requested slot specs * @return true if the slot request was registered; false if the request is a duplicate * @throws ResourceManagerException if the slot request failed (e.g. not enough resources left) */ @Override public boolean registerSlotRequest(SlotRequest slotRequest) throws ResourceManagerException { checkInit(); if (checkDuplicateRequest(slotRequest.getAllocationId())) { LOG.debug( "Ignoring a duplicate slot request with allocation id {}.", slotRequest.getAllocationId()); return false; } else { PendingSlotRequest pendingSlotRequest = new PendingSlotRequest(slotRequest); pendingSlotRequests.put(slotRequest.getAllocationId(), pendingSlotRequest); try { internalRequestSlot(pendingSlotRequest); } catch (ResourceManagerException e) { // requesting the slot failed --> remove pending slot request pendingSlotRequests.remove(slotRequest.getAllocationId()); throw new ResourceManagerException( "Could not fulfill slot request " + slotRequest.getAllocationId() + '.', e); } return true; } } /** * Tries to allocate a slot for the given slot request. If there is no slot available, the * resource manager is informed to allocate more resources and a timeout for the request is * registered. * * @param pendingSlotRequest to allocate a slot for * @throws ResourceManagerException if the slot request failed or is unfulfillable */ private void internalRequestSlot(PendingSlotRequest pendingSlotRequest) throws ResourceManagerException { final ResourceProfile resourceProfile = pendingSlotRequest.getResourceProfile(); OptionalConsumer.of(findMatchingSlot(resourceProfile)) .ifPresent(taskManagerSlot -> allocateSlot(taskManagerSlot, pendingSlotRequest)) .ifNotPresent( () -> fulfillPendingSlotRequestWithPendingTaskManagerSlot( pendingSlotRequest)); } /** * Finds a matching slot for a given resource profile. A matching slot has at least as many * resources available as the given resource profile. If there is no such slot available, then * the method returns null. * * <p>Note: If you want to change the behaviour of the slot manager wrt slot allocation and * request fulfillment, then you should override this method. * * @param requestResourceProfile specifying the resource requirements for the a slot request * @return A matching slot which fulfills the given resource profile. {@link Optional#empty()} * if there is no such slot available. */ private Optional<TaskManagerSlot> findMatchingSlot(ResourceProfile requestResourceProfile) { final Optional<TaskManagerSlot> optionalMatchingSlot = // slotMatchingStrategy 的 findMatchingSlot,分配 slot 给 job 的策略。 slotMatchingStrategy.findMatchingSlot( requestResourceProfile, freeSlots.values(), this::getNumberRegisteredSlotsOf); optionalMatchingSlot.ifPresent( taskManagerSlot -> { // sanity check Preconditions.checkState( taskManagerSlot.getState() == SlotState.FREE, "TaskManagerSlot %s is not in state FREE but %s.", taskManagerSlot.getSlotId(), taskManagerSlot.getState()); freeSlots.remove(taskManagerSlot.getSlotId()); }); return optionalMatchingSlot; } AnyMatchingSlotMatchingStrategy https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/AnyMatchingSlotMatchingStrategy.java /** {@link SlotMatchingStrategy} which picks the first matching slot. */ // 找到符合条件的任何一个 slot public enum AnyMatchingSlotMatchingStrategy implements SlotMatchingStrategy { INSTANCE; @Override public <T extends TaskManagerSlotInformation> Optional<T> findMatchingSlot( ResourceProfile requestedProfile, Collection<T> freeSlots, Function<InstanceID, Integer> numberRegisteredSlotsLookup) { return freeSlots.stream() .filter(slot -> slot.isMatchingRequirement(requestedProfile)) .findAny(); } } LeastUtilizationSlotMatchingStrategy // 从所有 Worker 中找出 Worker 中剩余 Slot 最多的那个,然后从此 Worker 的 Slot 中取一个。 https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/LeastUtilizationSlotMatchingStrategy.java /** * {@link SlotMatchingStrategy} which picks a matching slot from a TaskExecutor with the least * utilization. */ public enum LeastUtilizationSlotMatchingStrategy implements SlotMatchingStrategy { INSTANCE; @Override public <T extends TaskManagerSlotInformation> Optional<T> findMatchingSlot( ResourceProfile requestedProfile, Collection<T> freeSlots, Function<InstanceID, Integer> numberRegisteredSlotsLookup) { final Map<InstanceID, Integer> numSlotsPerTaskExecutor = freeSlots.stream() .collect( Collectors.groupingBy( TaskManagerSlotInformation::getInstanceId, Collectors.reducing(0, i -> 1, Integer::sum))); return freeSlots.stream() .filter(taskManagerSlot -> taskManagerSlot.isMatchingRequirement(requestedProfile)) .min( Comparator.comparingDouble( taskManagerSlot -> calculateUtilization( taskManagerSlot.getInstanceId(), numberRegisteredSlotsLookup, numSlotsPerTaskExecutor))); } private static double calculateUtilization( InstanceID instanceId, Function<? super InstanceID, Integer> numberRegisteredSlotsLookup, Map<InstanceID, Integer> numSlotsPerTaskExecutor) { final int numberRegisteredSlots = numberRegisteredSlotsLookup.apply(instanceId); Preconditions.checkArgument( numberRegisteredSlots > 0, "The TaskExecutor %s has no slots registered.", instanceId); final int numberFreeSlots = numSlotsPerTaskExecutor.getOrDefault(instanceId, 0); Preconditions.checkArgument( numberRegisteredSlots >= numberFreeSlots, "The TaskExecutor %s has fewer registered slots than free slots.", instanceId); return (double) (numberRegisteredSlots - numberFreeSlots) / numberRegisteredSlots; } } /** * Allocates the given slot for the given slot request. This entails sending a registration * message to the task manager and treating failures. * * @param taskManagerSlot to allocate for the given slot request * @param pendingSlotRequest to allocate the given slot for */ private void allocateSlot( TaskManagerSlot taskManagerSlot, PendingSlotRequest pendingSlotRequest) { CPU一个 TaskManager 对应 1-n 个 Slot,所有 Slot 共享 CPU 时间片。 内存
和精卫对比
Flink CDC
https://blog.csdn.net/young_0609/article/details/108412891 https://zhuanlan.zhihu.com/p/274492805?utm_source=wechat_session 参考文档Flinkx git官方:https://github.com/DTStack/flinkx/blob/1.10_release/README_CH.md Flink 官方文档:https://ci.apache.org/projects/flink/flink-docs-stable/zh/ Flink任务调度原理之并行度与任务链:https://dragonlsl.blog.csdn.net/article/details/106005264 Flink任务调度原理之TaskManager 与Slots:https://dragonlsl.blog.csdn.net/article/details/105823127 Flink任务调度原理之逻辑数据流与执行图:https://dragonlsl.blog.csdn.net/article/details/105957694 Flinx 失败恢复:https://github.com/DTStack/flinkx/blob/1.10_release/docs/restore.md Fink 自定义 Source 和 Sink:https://cloud.tencent.com/developer/article/1694062 Flink checkpoint 机制,分布式快照:https://zhuanlan.zhihu.com/p/104601440 Flink 如何保证 Exactly-Once:https://zhuanlan.zhihu.com/p/108570573 浅谈 Flink 分布式运行时和数据流图的并行化: https://zhuanlan.zhihu.com/p/107336730 Flink 端到端 Exactly-once:https://zhuanlan.zhihu.com/p/68797265 Flink资源管理:https://zhuanlan.zhihu.com/p/149144610 八张图搞懂 Flink 端到端精准一次处理语义 Exactly-once:https://zhuanlan.zhihu.com/p/348559815 大数据架构资料集:https://github.com/lhh2002/Framework-Of-BigData 深入解读 Flink 资源管理机制:https://www.jianshu.com/p/726bef539ea8 配置 TaskManager 内存:https://ci.apache.org/projects/flink/flink-docs-master/zh/docs/deployment/memory/mem_setup_tm/ TaskManager内存模型:https://zhuanlan.zhihu.com/p/340345588 一文带你彻底了解大数据处理引擎Flink内存管理: https://www.cnblogs.com/huaweiyun/p/14142780.html Flink 原理与实现:内存管理: http://wuchong.me/blog/2016/04/29/flink-internals-memory-manage/ TaskManager 的 Slot 平分 managed memory:https://stackoverflow.com/questions/47197744/confused-about-flink-task-slot Flink的内存模型:https://blog.csdn.net/zc19921215/article/details/106843097 Flink的 slot 到底是什么:https://cloud.tencent.com/developer/article/1693328 Flink TaskManager slot计算策略:https://www.yuque.com/aitozi/blog/gv2wdv?language=en-us Mysql 快照读: 基于 Flink SQL CDC 的实时数据同步方案:https://zhuanlan.zhihu.com/p/274492805?utm_source=wechat_session Flink SQL CDC 13 条生产实践经验:https://blog.csdn.net/young_0609/article/details/108412891 Flink Managed Memory: Flink集群运维:https://zhuanlan.zhihu.com/p/99755112 Flink 在监控系统上的实践和应用:https://www.infoq.cn/article/rajdtrlks*aglvjtv7ah |
|
|
上一篇文章 下一篇文章 查看所有文章 |
|
开发:
C++知识库
Java知识库
JavaScript
Python
PHP知识库
人工智能
区块链
大数据
移动开发
嵌入式
开发工具
数据结构与算法
开发测试
游戏开发
网络协议
系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程 数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁 |
360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 | -2025/1/18 16:56:29- |
|
网站联系: qq:121756557 email:121756557@qq.com IT数码 |