| |
|
|
开发:
C++知识库
Java知识库
JavaScript
Python
PHP知识库
人工智能
区块链
大数据
移动开发
嵌入式
开发工具
数据结构与算法
开发测试
游戏开发
网络协议
系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程 数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁 |
| -> 大数据 -> Flinkx -> 正文阅读 |
|
|
[大数据]Flinkx |
架构
支持任务类型
基本原理?
部署
任务配置
{
"job": {
"content": [
{
"reader": {
"parameter": {
"username": "root",
"password": "123456",
"fetchSize": 10,
"connection": [{
"jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/rpl?useUnicode=true&characterEncoding=utf8"],
"table": ["flink_test"]
}],
"column": ["id","name","age"],
"customSql": "",
"where": "id > 0",
"splitPk": "id",
"startLocation": "20",
"queryTimeOut": 1000,
"requestAccumulatorInterval": 2
},
"name": "mysqlreader"
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"username": "root",
"password": "123456",
"connection": [
{
"jdbcUrl": "jdbc:mysql://127.0.0.1:3306/rpl?useUnicode=true&characterEncoding=utf8",
"table": [
"flink_test_dst"
]
}
],
// "preSql": ["truncate table flink_test_dst;"],
"postSql": [],
"writeMode": "replace",
"column": [
{
"name": "id",
"type": "BIGINT"
},
{
"name": "name",
"type": "varchar"
},
{
"name": "age",
"type": "INT"
}
]
}
}
}
],
"setting": {
"speed": {
// "channel": 1,
"readerChannel": 3,
"writerChannel": 3,
"bytes": 0
},
"errorLimit": {
"record": 100
},
"restore": {
"maxRowNumForCheckpoint": 10,
"isRestore": true,
"restoreColumnName": "id",
"restoreColumnIndex": 0
},
"log" : {
"isLogger": false,
"level" : "debug",
"path" : "",
"pattern":""
}
}
}
}
Mysql To Mysql 全量
Reader拉取数据的 sqlselect * from data_test
where id mod ${channel_num}=${channel_index}
# and id > ${offset} 如果是第一次运行,则此条件不存在
# order by id 要求 id 必须升序排列,故此条件不需要
/**
* 构造查询sql
*
* @param inputSplit 数据切片
* @return 构建的sql字符串
*/
protected String buildQuerySql(InputSplit inputSplit) {
//QuerySqlBuilder中构建的queryTemplate
String querySql = queryTemplate;
if (inputSplit == null) {
LOG.warn("inputSplit = null, Executing sql is: '{}'", querySql);
return querySql;
}
JdbcInputSplit jdbcInputSplit = (JdbcInputSplit) inputSplit;
if (StringUtils.isNotEmpty(splitKey)) {
querySql = queryTemplate.replace("${N}", String.valueOf(numPartitions)).replace("${M}", String.valueOf(indexOfSubTask));
}
//是否开启断点续传
if (restoreConfig.isRestore()) {
if (formatState == null) {
querySql = querySql.replace(DbUtil.RESTORE_FILTER_PLACEHOLDER, StringUtils.EMPTY);
if (incrementConfig.isIncrement()) {
querySql = buildIncrementSql(jdbcInputSplit, querySql);
}
} else {
boolean useMaxFunc = incrementConfig.isUseMaxFunc();
String startLocation = getLocation(restoreColumn.getType(), formatState.getState());
if (StringUtils.isNotBlank(startLocation)) {
LOG.info("update startLocation, before = {}, after = {}", jdbcInputSplit.getStartLocation(), startLocation);
jdbcInputSplit.setStartLocation(startLocation);
useMaxFunc = false;
}
String restoreFilter = buildIncrementFilter(restoreColumn.getType(),
restoreColumn.getName(),
jdbcInputSplit.getStartLocation(),
jdbcInputSplit.getEndLocation(),
customSql,
useMaxFunc);
if (StringUtils.isNotEmpty(restoreFilter)) {
restoreFilter = " and " + restoreFilter;
}
querySql = querySql.replace(DbUtil.RESTORE_FILTER_PLACEHOLDER, restoreFilter);
}
querySql = querySql.replace(DbUtil.INCREMENT_FILTER_PLACEHOLDER, StringUtils.EMPTY);
} else if (incrementConfig.isIncrement()) {
querySql = buildIncrementSql(jdbcInputSplit, querySql);
}
LOG.warn("Executing sql is: '{}'", querySql);
return querySql;
}
Writer
@Override
protected void writeMultipleRecordsInternal() throws Exception {
try {
for (Row row : rows) {
for (int index = 0; index < row.getArity(); index++) {
preparedStatement.setObject(index+1, getField(row, index));
}
preparedStatement.addBatch();
if (restoreConfig.isRestore()) {
if (lastRow != null){
readyCheckpoint = !ObjectUtils.equals(lastRow.getField(restoreConfig.getRestoreColumnIndex()),
row.getField(restoreConfig.getRestoreColumnIndex()));
}
lastRow = row;
}
}
preparedStatement.executeBatch();
if(restoreConfig.isRestore()){
rowsOfCurrentTransaction += rows.size();
}else{
//手动提交事务
DbUtil.commit(dbConn);
}
preparedStatement.clearBatch();
} catch (Exception e){
LOG.warn("write Multiple Records error, row size = {}, first row = {}, e = {}",
rows.size(),
rows.size() > 0 ? GsonUtil.GSON.toJson(rows.get(0)) : "null",
ExceptionUtil.getErrorMessage(e));
LOG.warn("error to writeMultipleRecords, start to rollback connection, e = {}", ExceptionUtil.getErrorMessage(e));
DbUtil.rollBack(dbConn);
throw e;
}finally {
//执行完后清空batch
preparedStatement.clearBatch();
}
}
位点
Mysql 增量
Reader 给 Writer 的数据结构
{schema=rpl, after_id=222222, type=INSERT, after_value=2222222, table=t1, ts=6778672092931035136}
{schema=rpl, type=DELETE, table=t1, before_value=2222222, ts=6778672106155675648, before_id=222222}
{schema=rpl, after_id=222222, type=UPDATE, after_value=1, table=t1, before_value=2222222, ts=6778673333039927296, before_id=222222}
位点(checkpoint)
com/dtstack/flinkx/binlog/format/BinlogInputFormat.java
@Override
public FormatState getFormatState() {
if (!restoreConfig.isRestore()) {
LOG.info("return null for formatState");
return null;
}
super.getFormatState();
if (formatState != null) {
formatState.setState(entryPosition);
}
return formatState;
}
并发Flink 任务调度
? Flink 并行度与任务链
? 多 channel并发层级
"speed": {
"channel": 1,
"readerChannel": 3,
"writerChannel": 3,
"bytes": 0
}
Reader Writer channel 数不同
? Reader Writer channel 数相同
? 增量并发
限速
"speed": {
"channel": 1,
"bytes": 0
},
失败恢复Flink 的故障恢复checkpoint
?
?
? 恢复
这样的机制可以保证 Flink 内部状态的 Excatly-Once 一致性。至于端到端的Exactly-Once一致性,要根据Source和Sink的具体实现而定。当发生故障时,一部分数据有可能已经流入系统,但还未进行Checkpoint,Source的Checkpoint记录了输入的Offset;当重启时,Flink能把最近一次的Checkpoint恢复到内存中,并根据Offset,让Source从该位置重新发送一遍数据,以保证数据不丢不重。像Kafka等消息队列是提供重发功能的,socketTextStream就不具有这种功能,也意味着不能保证Exactly-Once投递保障。 Exactly-Once故障发生
? 事务写
?
Flinkx 的快照
例子:Mysql To HDFS
? 触发 checkpoint 后:
Flinkx 的故障恢复
资源管理调度
? https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@Override
public CompletableFuture<Acknowledge> requestSlot(
JobMasterId jobMasterId, SlotRequest slotRequest, final Time timeout) {
JobID jobId = slotRequest.getJobId();
JobManagerRegistration jobManagerRegistration = jobManagerRegistrations.get(jobId);
if (null != jobManagerRegistration) {
if (Objects.equals(jobMasterId, jobManagerRegistration.getJobMasterId())) {
log.info(
"Request slot with profile {} for job {} with allocation id {}.",
slotRequest.getResourceProfile(),
slotRequest.getJobId(),
slotRequest.getAllocationId());
try {
slotManager.registerSlotRequest(slotRequest);
} catch (ResourceManagerException e) {
return FutureUtils.completedExceptionally(e);
}
return CompletableFuture.completedFuture(Acknowledge.get());
} else {
return FutureUtils.completedExceptionally(
new ResourceManagerException(
"The job leader's id "
+ jobManagerRegistration.getJobMasterId()
+ " does not match the received id "
+ jobMasterId
+ '.'));
}
} else {
return FutureUtils.completedExceptionally(
new ResourceManagerException(
"Could not find registered job manager for job " + jobId + '.'));
}
}
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
/**
* Requests a slot with the respective resource profile.
*
* @param slotRequest specifying the requested slot specs
* @return true if the slot request was registered; false if the request is a duplicate
* @throws ResourceManagerException if the slot request failed (e.g. not enough resources left)
*/
@Override
public boolean registerSlotRequest(SlotRequest slotRequest) throws ResourceManagerException {
checkInit();
if (checkDuplicateRequest(slotRequest.getAllocationId())) {
LOG.debug(
"Ignoring a duplicate slot request with allocation id {}.",
slotRequest.getAllocationId());
return false;
} else {
PendingSlotRequest pendingSlotRequest = new PendingSlotRequest(slotRequest);
pendingSlotRequests.put(slotRequest.getAllocationId(), pendingSlotRequest);
try {
internalRequestSlot(pendingSlotRequest);
} catch (ResourceManagerException e) {
// requesting the slot failed --> remove pending slot request
pendingSlotRequests.remove(slotRequest.getAllocationId());
throw new ResourceManagerException(
"Could not fulfill slot request " + slotRequest.getAllocationId() + '.', e);
}
return true;
}
}
/**
* Tries to allocate a slot for the given slot request. If there is no slot available, the
* resource manager is informed to allocate more resources and a timeout for the request is
* registered.
*
* @param pendingSlotRequest to allocate a slot for
* @throws ResourceManagerException if the slot request failed or is unfulfillable
*/
private void internalRequestSlot(PendingSlotRequest pendingSlotRequest)
throws ResourceManagerException {
final ResourceProfile resourceProfile = pendingSlotRequest.getResourceProfile();
OptionalConsumer.of(findMatchingSlot(resourceProfile))
.ifPresent(taskManagerSlot -> allocateSlot(taskManagerSlot, pendingSlotRequest))
.ifNotPresent(
() ->
fulfillPendingSlotRequestWithPendingTaskManagerSlot(
pendingSlotRequest));
}
/**
* Finds a matching slot for a given resource profile. A matching slot has at least as many
* resources available as the given resource profile. If there is no such slot available, then
* the method returns null.
*
* <p>Note: If you want to change the behaviour of the slot manager wrt slot allocation and
* request fulfillment, then you should override this method.
*
* @param requestResourceProfile specifying the resource requirements for the a slot request
* @return A matching slot which fulfills the given resource profile. {@link Optional#empty()}
* if there is no such slot available.
*/
private Optional<TaskManagerSlot> findMatchingSlot(ResourceProfile requestResourceProfile) {
final Optional<TaskManagerSlot> optionalMatchingSlot =
// slotMatchingStrategy 的 findMatchingSlot,分配 slot 给 job 的策略。
slotMatchingStrategy.findMatchingSlot(
requestResourceProfile,
freeSlots.values(),
this::getNumberRegisteredSlotsOf);
optionalMatchingSlot.ifPresent(
taskManagerSlot -> {
// sanity check
Preconditions.checkState(
taskManagerSlot.getState() == SlotState.FREE,
"TaskManagerSlot %s is not in state FREE but %s.",
taskManagerSlot.getSlotId(),
taskManagerSlot.getState());
freeSlots.remove(taskManagerSlot.getSlotId());
});
return optionalMatchingSlot;
}
AnyMatchingSlotMatchingStrategy
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/AnyMatchingSlotMatchingStrategy.java
/** {@link SlotMatchingStrategy} which picks the first matching slot. */
// 找到符合条件的任何一个 slot
public enum AnyMatchingSlotMatchingStrategy implements SlotMatchingStrategy {
INSTANCE;
@Override
public <T extends TaskManagerSlotInformation> Optional<T> findMatchingSlot(
ResourceProfile requestedProfile,
Collection<T> freeSlots,
Function<InstanceID, Integer> numberRegisteredSlotsLookup) {
return freeSlots.stream()
.filter(slot -> slot.isMatchingRequirement(requestedProfile))
.findAny();
}
}
LeastUtilizationSlotMatchingStrategy
// 从所有 Worker 中找出 Worker 中剩余 Slot 最多的那个,然后从此 Worker 的 Slot 中取一个。
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/LeastUtilizationSlotMatchingStrategy.java
/**
* {@link SlotMatchingStrategy} which picks a matching slot from a TaskExecutor with the least
* utilization.
*/
public enum LeastUtilizationSlotMatchingStrategy implements SlotMatchingStrategy {
INSTANCE;
@Override
public <T extends TaskManagerSlotInformation> Optional<T> findMatchingSlot(
ResourceProfile requestedProfile,
Collection<T> freeSlots,
Function<InstanceID, Integer> numberRegisteredSlotsLookup) {
final Map<InstanceID, Integer> numSlotsPerTaskExecutor =
freeSlots.stream()
.collect(
Collectors.groupingBy(
TaskManagerSlotInformation::getInstanceId,
Collectors.reducing(0, i -> 1, Integer::sum)));
return freeSlots.stream()
.filter(taskManagerSlot -> taskManagerSlot.isMatchingRequirement(requestedProfile))
.min(
Comparator.comparingDouble(
taskManagerSlot ->
calculateUtilization(
taskManagerSlot.getInstanceId(),
numberRegisteredSlotsLookup,
numSlotsPerTaskExecutor)));
}
private static double calculateUtilization(
InstanceID instanceId,
Function<? super InstanceID, Integer> numberRegisteredSlotsLookup,
Map<InstanceID, Integer> numSlotsPerTaskExecutor) {
final int numberRegisteredSlots = numberRegisteredSlotsLookup.apply(instanceId);
Preconditions.checkArgument(
numberRegisteredSlots > 0,
"The TaskExecutor %s has no slots registered.",
instanceId);
final int numberFreeSlots = numSlotsPerTaskExecutor.getOrDefault(instanceId, 0);
Preconditions.checkArgument(
numberRegisteredSlots >= numberFreeSlots,
"The TaskExecutor %s has fewer registered slots than free slots.",
instanceId);
return (double) (numberRegisteredSlots - numberFreeSlots) / numberRegisteredSlots;
}
}
/**
* Allocates the given slot for the given slot request. This entails sending a registration
* message to the task manager and treating failures.
*
* @param taskManagerSlot to allocate for the given slot request
* @param pendingSlotRequest to allocate the given slot for
*/
private void allocateSlot(
TaskManagerSlot taskManagerSlot, PendingSlotRequest pendingSlotRequest) {
CPU一个 TaskManager 对应 1-n 个 Slot,所有 Slot 共享 CPU 时间片。 内存
和精卫对比
Flink CDC
https://blog.csdn.net/young_0609/article/details/108412891 https://zhuanlan.zhihu.com/p/274492805?utm_source=wechat_session 参考文档Flinkx git官方:https://github.com/DTStack/flinkx/blob/1.10_release/README_CH.md Flink 官方文档:https://ci.apache.org/projects/flink/flink-docs-stable/zh/ Flink任务调度原理之并行度与任务链:https://dragonlsl.blog.csdn.net/article/details/106005264 Flink任务调度原理之TaskManager 与Slots:https://dragonlsl.blog.csdn.net/article/details/105823127 Flink任务调度原理之逻辑数据流与执行图:https://dragonlsl.blog.csdn.net/article/details/105957694 Flinx 失败恢复:https://github.com/DTStack/flinkx/blob/1.10_release/docs/restore.md Fink 自定义 Source 和 Sink:https://cloud.tencent.com/developer/article/1694062 Flink checkpoint 机制,分布式快照:https://zhuanlan.zhihu.com/p/104601440 Flink 如何保证 Exactly-Once:https://zhuanlan.zhihu.com/p/108570573 浅谈 Flink 分布式运行时和数据流图的并行化: https://zhuanlan.zhihu.com/p/107336730 Flink 端到端 Exactly-once:https://zhuanlan.zhihu.com/p/68797265 Flink资源管理:https://zhuanlan.zhihu.com/p/149144610 八张图搞懂 Flink 端到端精准一次处理语义 Exactly-once:https://zhuanlan.zhihu.com/p/348559815 大数据架构资料集:https://github.com/lhh2002/Framework-Of-BigData 深入解读 Flink 资源管理机制:https://www.jianshu.com/p/726bef539ea8 配置 TaskManager 内存:https://ci.apache.org/projects/flink/flink-docs-master/zh/docs/deployment/memory/mem_setup_tm/ TaskManager内存模型:https://zhuanlan.zhihu.com/p/340345588 一文带你彻底了解大数据处理引擎Flink内存管理: https://www.cnblogs.com/huaweiyun/p/14142780.html Flink 原理与实现:内存管理: http://wuchong.me/blog/2016/04/29/flink-internals-memory-manage/ TaskManager 的 Slot 平分 managed memory:https://stackoverflow.com/questions/47197744/confused-about-flink-task-slot Flink的内存模型:https://blog.csdn.net/zc19921215/article/details/106843097 Flink的 slot 到底是什么:https://cloud.tencent.com/developer/article/1693328 Flink TaskManager slot计算策略:https://www.yuque.com/aitozi/blog/gv2wdv?language=en-us Mysql 快照读: 基于 Flink SQL CDC 的实时数据同步方案:https://zhuanlan.zhihu.com/p/274492805?utm_source=wechat_session Flink SQL CDC 13 条生产实践经验:https://blog.csdn.net/young_0609/article/details/108412891 Flink Managed Memory: Flink集群运维:https://zhuanlan.zhihu.com/p/99755112 Flink 在监控系统上的实践和应用:https://www.infoq.cn/article/rajdtrlks*aglvjtv7ah |
|
|
|
|
| 上一篇文章 下一篇文章 查看所有文章 |
|
|
开发:
C++知识库
Java知识库
JavaScript
Python
PHP知识库
人工智能
区块链
大数据
移动开发
嵌入式
开发工具
数据结构与算法
开发测试
游戏开发
网络协议
系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程 数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁 |
| 360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年10日历 | -2025/10/24 17:57:58- |
|
| 网站联系: qq:121756557 email:121756557@qq.com IT数码 |