IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Flinkx -> 正文阅读

[大数据]Flinkx

架构

  • 基于flink的分布式离线和实时的数据同步框架,实现了多种异构数据源之间高效的数据迁移。
  • 不同的数据源头被抽象成不同的Reader插件(Source),不同的数据目标被抽象成不同的Writer插件(Sink)。
  • 每接入一套新数据源该新加入的数据源即可实现和现有的数据源互通。
  • 所有任务被包装成 job 提交给 Flink 运行,流处理。

支持任务类型

  • 全量传输
  • 关系型数据库实时增量(读取 binlog)
  • 间隔轮询(定时,可只针对增量)

基本原理

?

  • Flinkx 本质是一个 override 的 Flink client。
  • 数据同步任务会被翻译成 StreamGraph 在 Flink 上执行。
  • Reader 和 Writer 会被翻译成不同的 Flink Operator。
  • Reader = Flink Source。
  • Writer = Flink Sink。

部署

  • Local 模式:会启动一个 embed Flink。
  • Flink 集群。

任务配置

  • 每次提交是一个 job
  • reader 配置
  • writer 配置
  • 通道配置(channel,speed)
  • checkpoint 配置
{
  "job": {
    "content": [
      {
        "reader": {
          "parameter": {
            "username": "root",
            "password": "123456",
            "fetchSize": 10,
            "connection": [{
              "jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/rpl?useUnicode=true&characterEncoding=utf8"],
              "table": ["flink_test"]
            }],
            "column": ["id","name","age"],
            "customSql": "",
            "where": "id > 0",
            "splitPk": "id",
            "startLocation": "20",
            "queryTimeOut": 1000,
            "requestAccumulatorInterval": 2
          },
          "name": "mysqlreader"
        },
        "writer": {
          "name": "mysqlwriter",
          "parameter": {
            "username": "root",
            "password": "123456",
            "connection": [
              {
                "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/rpl?useUnicode=true&characterEncoding=utf8",
                "table": [
                  "flink_test_dst"
                ]
              }
            ],
            //            "preSql": ["truncate table flink_test_dst;"],
            "postSql": [],
            "writeMode": "replace",
            "column": [
              {
                "name": "id",
                "type": "BIGINT"
              },
              {
                "name": "name",
                "type": "varchar"
              },
              {
                "name": "age",
                "type": "INT"
              }
            ]
          }
        }
      }
    ],
    "setting": {
      "speed": {
//        "channel": 1,
        "readerChannel": 3,
        "writerChannel": 3,
        "bytes": 0
      },
      "errorLimit": {
        "record": 100
      },
      "restore": {
        "maxRowNumForCheckpoint": 10,
        "isRestore": true,
        "restoreColumnName": "id",
        "restoreColumnIndex": 0
      },
      "log" : {
        "isLogger": false,
        "level" : "debug",
        "path" : "",
        "pattern":""
      }
    }
  }
}

Mysql To Mysql 全量

  • 每张表构造 SELECT sql 拉取数据。
  • fetchSize:拉取批量大小,默认拉全部。
  • restoreColumnName: 记录 checkpoint 的列,用于断点续传。
  • startLocation: 设置起点(对应 restoreColumnName)。
  • 支持自定义 where 条件过滤(类似精卫)。
  • ?writeMode:insert / replace。

Reader

拉取数据的 sql

select * from data_test
where id mod ${channel_num}=${channel_index}
# and id > ${offset}  如果是第一次运行,则此条件不存在
# order by id  要求 id 必须升序排列,故此条件不需要
/**
     * 构造查询sql
     *
     * @param inputSplit 数据切片
     * @return 构建的sql字符串
     */
protected String buildQuerySql(InputSplit inputSplit) {
    //QuerySqlBuilder中构建的queryTemplate
    String querySql = queryTemplate;

    if (inputSplit == null) {
        LOG.warn("inputSplit = null, Executing sql is: '{}'", querySql);
        return querySql;
    }

    JdbcInputSplit jdbcInputSplit = (JdbcInputSplit) inputSplit;

    if (StringUtils.isNotEmpty(splitKey)) {
        querySql = queryTemplate.replace("${N}", String.valueOf(numPartitions)).replace("${M}", String.valueOf(indexOfSubTask));
    }

    //是否开启断点续传
    if (restoreConfig.isRestore()) {
        if (formatState == null) {
            querySql = querySql.replace(DbUtil.RESTORE_FILTER_PLACEHOLDER, StringUtils.EMPTY);

            if (incrementConfig.isIncrement()) {
                querySql = buildIncrementSql(jdbcInputSplit, querySql);
            }
        } else {
            boolean useMaxFunc = incrementConfig.isUseMaxFunc();
            String startLocation = getLocation(restoreColumn.getType(), formatState.getState());
            if (StringUtils.isNotBlank(startLocation)) {
                LOG.info("update startLocation, before = {}, after = {}", jdbcInputSplit.getStartLocation(), startLocation);
                jdbcInputSplit.setStartLocation(startLocation);
                useMaxFunc = false;
            }
            String restoreFilter = buildIncrementFilter(restoreColumn.getType(),
                                                        restoreColumn.getName(),
                                                        jdbcInputSplit.getStartLocation(),
                                                        jdbcInputSplit.getEndLocation(),
                                                        customSql,
                                                        useMaxFunc);

            if (StringUtils.isNotEmpty(restoreFilter)) {
                restoreFilter = " and " + restoreFilter;
            }

            querySql = querySql.replace(DbUtil.RESTORE_FILTER_PLACEHOLDER, restoreFilter);
        }

        querySql = querySql.replace(DbUtil.INCREMENT_FILTER_PLACEHOLDER, StringUtils.EMPTY);
    } else if (incrementConfig.isIncrement()) {
        querySql = buildIncrementSql(jdbcInputSplit, querySql);
    }

    LOG.warn("Executing sql is: '{}'", querySql);

    return querySql;
}

Writer

  • 支持批量写
@Override
protected void writeMultipleRecordsInternal() throws Exception {
    try {
        for (Row row : rows) {
            for (int index = 0; index < row.getArity(); index++) {
                preparedStatement.setObject(index+1, getField(row, index));
            }
            preparedStatement.addBatch();

            if (restoreConfig.isRestore()) {
                if (lastRow != null){
                    readyCheckpoint = !ObjectUtils.equals(lastRow.getField(restoreConfig.getRestoreColumnIndex()),
                                                          row.getField(restoreConfig.getRestoreColumnIndex()));
                }

                lastRow = row;
            }
        }

        preparedStatement.executeBatch();

        if(restoreConfig.isRestore()){
            rowsOfCurrentTransaction += rows.size();
        }else{
            //手动提交事务
            DbUtil.commit(dbConn);
        }
        preparedStatement.clearBatch();
    } catch (Exception e){
        LOG.warn("write Multiple Records error, row size = {}, first row = {},  e = {}",
                 rows.size(),
                 rows.size() > 0 ? GsonUtil.GSON.toJson(rows.get(0)) : "null",
                 ExceptionUtil.getErrorMessage(e));
        LOG.warn("error to writeMultipleRecords, start to rollback connection, e = {}", ExceptionUtil.getErrorMessage(e));
        DbUtil.rollBack(dbConn);
        throw e;
    }finally {
        //执行完后清空batch
        preparedStatement.clearBatch();
    }
}

位点

  • 利用 flink 的 checkpoint 机制
  • 数据源(这里特指关系数据库)中必须包含一个升序的字段,比如主键
  • 同步过程中会使用 checkpoint 机制记录这个字段的值,任务恢复运行时使用这个字段构造查询条件过滤已经同步过的数据
  • 数据源必须支持数据过滤(即比如 id)。用 restoreColumnName 或 restoreColumnIndex (如 id 字段)作为 checkpoint。

Mysql 增量

  • Reader 基于 Canal
  • Reader 不支持多 channel,只能是 1。
  • Writer 支持多 channel。
  • 只支持 DML
  • 支持多种条件过滤

Reader 给 Writer 的数据结构

  • 每个 event 变成一个 Flink 的 Row
  • Row 中包含了 before / after 的所有数据
{schema=rpl, after_id=222222, type=INSERT, after_value=2222222, table=t1, ts=6778672092931035136}
{schema=rpl, type=DELETE, table=t1, before_value=2222222, ts=6778672106155675648, before_id=222222}
{schema=rpl, after_id=222222, type=UPDATE, after_value=1, table=t1, before_value=2222222, ts=6778673333039927296, before_id=222222}

位点(checkpoint)

  • 以 binlog position 作为位点(canal 解析出来的数据结构)
com/dtstack/flinkx/binlog/format/BinlogInputFormat.java
@Override
public FormatState getFormatState() {
    if (!restoreConfig.isRestore()) {
        LOG.info("return null for formatState");
        return null;
    }

    super.getFormatState();
    if (formatState != null) {
        formatState.setState(entryPosition);
    }
    return formatState;
}

并发

Flink 任务调度

  • worker(TaskManager)= JVM 进程。
  • task slots:为了控制一个 TaskManager 中接受多少个 task,在运行时,一个 slot = 一个线程。
  • 一个 worker 的 所有 slot 平分 JVM 进程的 managed memory 内存资源,但 cpu 资源则争抢。
  • K8S 等容器化部署中,一个 TaskManager 一般只有一个 slot。

?

Flink 并行度与任务链

  • 如果并行度相同,Flink 允许多个 subTask 共同运行在一个 slot 中,组成一个 task,只要是来自于同一作业即可。结果就是一个 slot 可以持有整个作业管道。
  • subTask = spark 的 task。task = spark 的 taskSet。
  • 可以充分利用分配的资源,减少 Slot 使用,减少 rebalance。

?

多 channel

并发层级

  • channel: reader 和 writer 相同并发数,推荐使用
  • readerChannel: reader 单独的并发数
  • writerChannel:writer 单独的并发数
  • 不推荐单独设置 readerChannel 和 writerChannel,除非 reader 或 writer 一方为瓶颈。
"speed": {
    "channel": 1,
    "readerChannel": 3,
    "writerChannel": 3,
    "bytes": 0
}

Reader Writer channel 数不同

  • "readerChannel": 2,"writerChannel": 3
  • 需要 rebalance,会有线程上下文切换(如果 reader task 和 writer task 在属于同一个 TaskManager),或者会有网络传输(reader task 和 writer task 不属于同一个 TaskManager)

?

Reader Writer channel 数相同

  • "readerChannel": 3, ?"writerChannel": 3
  • 由于 Flink 的任务链机制,只会有 3 个 task,且不会产生 rebalance。

?

增量并发

  • 只支持 Writer 并发。

限速

  • 通过 bytes 参数限流。
  • 使用 RateLimiter 进行 bytes 维度的限速。
"speed": {
        "channel": 1,
        "bytes": 0
      },

失败恢复

Flink 的故障恢复

checkpoint

  • 一个 job 的 checkpoint 由 JobManager 的 Checkpoint Coordinator 发起。

?

?

  • state.backend: filesystem
  • 一次 Checkpoint 由 Checkpoint Coordinator(在 JobManager 中,即 Master) 发起。
  • 如果一个 Operator 有多个流入,则需要等多个流入对齐。
  • 每一步 Operator 完成自己的快照后,都要向 Coordinator 发送 ACK。
  • 所有 Operator 完成快照后,一次 Checkpoint 才算完成。

?

恢复

  • 重启应用,在集群上重新部署数据流图。
  • 从持久化存储上读取最近一次的Checkpoint数据,加载到各算子子任务上。
  • 继续处理新流入的数据。

这样的机制可以保证 Flink 内部状态的 Excatly-Once 一致性。至于端到端的Exactly-Once一致性,要根据Source和Sink的具体实现而定。当发生故障时,一部分数据有可能已经流入系统,但还未进行Checkpoint,Source的Checkpoint记录了输入的Offset;当重启时,Flink能把最近一次的Checkpoint恢复到内存中,并根据Offset,让Source从该位置重新发送一遍数据,以保证数据不丢不重。像Kafka等消息队列是提供重发功能的,socketTextStream就不具有这种功能,也意味着不能保证Exactly-Once投递保障。

Exactly-Once

故障发生

?

事务写

  • link先将待输出的数据保存下来暂时不向外部系统提交,等待Checkpoint结束的时刻,Flink上下游所有算子的数据都是一致时,将之前保存的数据全部提交(Commit)到外部系统。
  • 只有经过Checkpoint确认的数据才向外部系统写入。
  • WAL
  • 2PC

?

  • 事务写的方式能提供端到端的 Exactly-Once 一致性,但牺牲了性能。
  • 输出数据不再是实时写入到外部系统,而是分批次地提交。

Flinkx 的快照

  • 断点续传(全量)和实时采集(增量)都依赖于flink的checkpoint机制
  • Checkpoint是Flink实现容错机制最核心的功能,它能够根据配置周期性地基于Stream中各个Operator的状态来生成Snapshot
  • 数据源(这里特指关系数据库)中必须包含一个升序的字段,比如主键或者日期类型的字段
  • 数据源必须支持数据过滤,如果不支持的话,任务就无法从断点处恢复运行,会导致数据重复
  • 要做到 Exactly-Once,目标数据源必须支持事务,比如关系数据库,文件类型的数据源也可以通过临时文件的方式支持

例子:Mysql To HDFS

  • 假设checkpoint触发时两个通道的读取和写入情况如图

?

触发 checkpoint 后:

  • 两个reader先生成Snapshot记录读取状态,通道0的状态为 id=12,通道1的状态为 id=11。
  • Snapshot生成之后向数据流里面插入barrier,barrier随数据流向Writer。
  • 以Writer_0为例,以Writer_0接收Reader_0和Reader_1的发来的数据,假设先收到了Reader_0的barrier,这个时候Writer_0停止写出数据到HDFS,将接收到的数据先放到 InputBuffer里面,一直等待Reader_1的barrier到达之后再将Buffer里的数据全部写出,然后生成Writer的Snapshot。
  • 然后,整个 checkpoint 结束。
  • 如果在整个 checkpoint 过程中,有任何一个 operator 异常,任务会直接失败,这样这次快照就不会生成,任务恢复时会从上一个成功的快照恢复。
  • 简单来说,必须要所有的 Writer 写成功后,该次 checkpoint 才算生效。

Flinkx 的故障恢复

  • 依赖 Flink 的基于 checkpoint 的任务故障恢复。

资源管理

调度

  • 1 个 Flink Master 包含一个 Flink Master 中有一个 Resource Manager 和多个 Job Manager ,Flink Master 中每一个 Job Manager 单独管理一个具体的 Job。
  • 两层调度机制。
  • 第一层,Job Manager 中的 Scheduler 组件负责调度执行该 Job 的 DAG 中所有 Task ,发出资源请求,即整个资源调度的起点。
  • JobManager 中的 Slot Pool 组件持有分配到该 Job 的所有资源,Shceduler 负责给每个 task 从 Slot Pool 中分配 Slot 资源。
  • TaskManager 每个 Slot 负责运行任务。

?

https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@Override
public CompletableFuture<Acknowledge> requestSlot(
    JobMasterId jobMasterId, SlotRequest slotRequest, final Time timeout) {

    JobID jobId = slotRequest.getJobId();
    JobManagerRegistration jobManagerRegistration = jobManagerRegistrations.get(jobId);

    if (null != jobManagerRegistration) {
        if (Objects.equals(jobMasterId, jobManagerRegistration.getJobMasterId())) {
            log.info(
                "Request slot with profile {} for job {} with allocation id {}.",
                slotRequest.getResourceProfile(),
                slotRequest.getJobId(),
                slotRequest.getAllocationId());

            try {
                slotManager.registerSlotRequest(slotRequest);
            } catch (ResourceManagerException e) {
                return FutureUtils.completedExceptionally(e);
            }

            return CompletableFuture.completedFuture(Acknowledge.get());
        } else {
            return FutureUtils.completedExceptionally(
                new ResourceManagerException(
                    "The job leader's id "
                    + jobManagerRegistration.getJobMasterId()
                    + " does not match the received id "
                    + jobMasterId
                    + '.'));
        }

    } else {
        return FutureUtils.completedExceptionally(
            new ResourceManagerException(
                "Could not find registered job manager for job " + jobId + '.'));
    }
}
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
/**
     * Requests a slot with the respective resource profile.
     *
     * @param slotRequest specifying the requested slot specs
     * @return true if the slot request was registered; false if the request is a duplicate
     * @throws ResourceManagerException if the slot request failed (e.g. not enough resources left)
     */
@Override
public boolean registerSlotRequest(SlotRequest slotRequest) throws ResourceManagerException {
    checkInit();

    if (checkDuplicateRequest(slotRequest.getAllocationId())) {
        LOG.debug(
            "Ignoring a duplicate slot request with allocation id {}.",
            slotRequest.getAllocationId());

        return false;
    } else {
        PendingSlotRequest pendingSlotRequest = new PendingSlotRequest(slotRequest);

        pendingSlotRequests.put(slotRequest.getAllocationId(), pendingSlotRequest);

        try {
            internalRequestSlot(pendingSlotRequest);
        } catch (ResourceManagerException e) {
            // requesting the slot failed --> remove pending slot request
            pendingSlotRequests.remove(slotRequest.getAllocationId());

            throw new ResourceManagerException(
                "Could not fulfill slot request " + slotRequest.getAllocationId() + '.', e);
        }

        return true;
    }
}

/**
     * Tries to allocate a slot for the given slot request. If there is no slot available, the
     * resource manager is informed to allocate more resources and a timeout for the request is
     * registered.
     *
     * @param pendingSlotRequest to allocate a slot for
     * @throws ResourceManagerException if the slot request failed or is unfulfillable
     */
private void internalRequestSlot(PendingSlotRequest pendingSlotRequest)
    throws ResourceManagerException {
    final ResourceProfile resourceProfile = pendingSlotRequest.getResourceProfile();

    OptionalConsumer.of(findMatchingSlot(resourceProfile))
        .ifPresent(taskManagerSlot -> allocateSlot(taskManagerSlot, pendingSlotRequest))
        .ifNotPresent(
        () ->
        fulfillPendingSlotRequestWithPendingTaskManagerSlot(
            pendingSlotRequest));
}

/**
     * Finds a matching slot for a given resource profile. A matching slot has at least as many
     * resources available as the given resource profile. If there is no such slot available, then
     * the method returns null.
     *
     * <p>Note: If you want to change the behaviour of the slot manager wrt slot allocation and
     * request fulfillment, then you should override this method.
     *
     * @param requestResourceProfile specifying the resource requirements for the a slot request
     * @return A matching slot which fulfills the given resource profile. {@link Optional#empty()}
     *     if there is no such slot available.
     */
private Optional<TaskManagerSlot> findMatchingSlot(ResourceProfile requestResourceProfile) {
    final Optional<TaskManagerSlot> optionalMatchingSlot =
        // slotMatchingStrategy 的 findMatchingSlot,分配 slot 给 job 的策略。
        slotMatchingStrategy.findMatchingSlot(
        requestResourceProfile,
        freeSlots.values(),
        this::getNumberRegisteredSlotsOf);

    optionalMatchingSlot.ifPresent(
        taskManagerSlot -> {
            // sanity check
            Preconditions.checkState(
                taskManagerSlot.getState() == SlotState.FREE,
                "TaskManagerSlot %s is not in state FREE but %s.",
                taskManagerSlot.getSlotId(),
                taskManagerSlot.getState());

            freeSlots.remove(taskManagerSlot.getSlotId());
        });

    return optionalMatchingSlot;
}

AnyMatchingSlotMatchingStrategy
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/AnyMatchingSlotMatchingStrategy.java
/** {@link SlotMatchingStrategy} which picks the first matching slot. */
// 找到符合条件的任何一个 slot
public enum AnyMatchingSlotMatchingStrategy implements SlotMatchingStrategy {
    INSTANCE;

    @Override
    public <T extends TaskManagerSlotInformation> Optional<T> findMatchingSlot(
            ResourceProfile requestedProfile,
            Collection<T> freeSlots,
            Function<InstanceID, Integer> numberRegisteredSlotsLookup) {

        return freeSlots.stream()
                .filter(slot -> slot.isMatchingRequirement(requestedProfile))
                .findAny();
    }
}

LeastUtilizationSlotMatchingStrategy
// 从所有 Worker 中找出 Worker 中剩余 Slot 最多的那个,然后从此 Worker 的 Slot 中取一个。
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/LeastUtilizationSlotMatchingStrategy.java
/**
 * {@link SlotMatchingStrategy} which picks a matching slot from a TaskExecutor with the least
 * utilization.
 */
public enum LeastUtilizationSlotMatchingStrategy implements SlotMatchingStrategy {
    INSTANCE;

    @Override
    public <T extends TaskManagerSlotInformation> Optional<T> findMatchingSlot(
            ResourceProfile requestedProfile,
            Collection<T> freeSlots,
            Function<InstanceID, Integer> numberRegisteredSlotsLookup) {
        final Map<InstanceID, Integer> numSlotsPerTaskExecutor =
                freeSlots.stream()
                        .collect(
                                Collectors.groupingBy(
                                        TaskManagerSlotInformation::getInstanceId,
                                        Collectors.reducing(0, i -> 1, Integer::sum)));

        return freeSlots.stream()
                .filter(taskManagerSlot -> taskManagerSlot.isMatchingRequirement(requestedProfile))
                .min(
                        Comparator.comparingDouble(
                                taskManagerSlot ->
                                        calculateUtilization(
                                                taskManagerSlot.getInstanceId(),
                                                numberRegisteredSlotsLookup,
                                                numSlotsPerTaskExecutor)));
    }

    private static double calculateUtilization(
            InstanceID instanceId,
            Function<? super InstanceID, Integer> numberRegisteredSlotsLookup,
            Map<InstanceID, Integer> numSlotsPerTaskExecutor) {
        final int numberRegisteredSlots = numberRegisteredSlotsLookup.apply(instanceId);

        Preconditions.checkArgument(
                numberRegisteredSlots > 0,
                "The TaskExecutor %s has no slots registered.",
                instanceId);

        final int numberFreeSlots = numSlotsPerTaskExecutor.getOrDefault(instanceId, 0);

        Preconditions.checkArgument(
                numberRegisteredSlots >= numberFreeSlots,
                "The TaskExecutor %s has fewer registered slots than free slots.",
                instanceId);

        return (double) (numberRegisteredSlots - numberFreeSlots) / numberRegisteredSlots;
    }
}

/**
     * Allocates the given slot for the given slot request. This entails sending a registration
     * message to the task manager and treating failures.
     *
     * @param taskManagerSlot to allocate for the given slot request
     * @param pendingSlotRequest to allocate the given slot for
     */
private void allocateSlot(
    TaskManagerSlot taskManagerSlot, PendingSlotRequest pendingSlotRequest) {
    

CPU

一个 TaskManager 对应 1-n 个 Slot,所有 Slot 共享 CPU 时间片。

内存

  • 对于批处理,多个 Slot 平分 TaskManager 的 managed memory,用于排序、哈希表及缓存中间结果。
  • 对于流处理,managed memory 只用于 RocksDB State Backend(存储 checkpoint)。
  • FlinkX 的用户代码(Reader,Writer)使用 JVM 的堆内存。

和精卫对比

精卫

Flinkx

架构

ZK + Meta DB + 容器集群 + 控制台

Flink 集群

调度

级别:

任务级别分配,而不是服务级分配。

策略:

新任务:Leader 从所有 Worker 中找到内存剩余最多的一个。

旧任务:任务与机器键存在粘性

级别:

两个层级:job 级别 + task 级别。

job 级别:jobManager 向 resourceManager 获取整个 job 需要的 slot 资源,保证整个 job 能够全部运行。

task 级别:job 内部的 scheduler 在已经分配到的 slot 中为各个 task 分配资源。

策略:

LeastUtilizationSlotMatchingStrategy 或?AnyMatchingSlotMatchingStrategy

集群重启

任务自动恢复

需要手动启动

元数据管理

集中管理

用户自己编辑文件

故障恢复

位点

checkpoint 机制

Exactly-Once

No

Yes

是否需要额外存储

No

No(对比类似架构的 DTS,DTS 是需要的)

任务整合

一个任务多张表

只支持一个任务一张表

统计信息

tps/latency/消息总数。

便于实时监控。

bytes/errorCount/numRead/numWrite。

对数据量统计更详细。

但对于实时任务监控缺失(如 latency)。

任务隔离

进程级

线程级

任务物理资源管理

多个 task 共享 CPU。

内存由 JVM 托管,单个 task 独享堆内存。

不同任务可以定制不同 JVM 内存大小。

多个 task 共享 CPU。

不能为单个任务定制 JVM 内存大小,只能在集群统一配置 slot 内存。

如果是容器化部署,一般一个 TaskManager 部署一个 Slot,则 CPU 和内存都是独享。

流量控制

RateLimiter,tps 维度

RateLimiter, bytes 维度

运维

精卫集群管理

主要是 Flink 运维,如果要做成平台,需二次开发

受众

稳定的服务级产品,保障长期任务。

更方便的定制和人工运维。

开源,小规模使用,短平快

Flink CDC

  • Debezium

https://blog.csdn.net/young_0609/article/details/108412891

https://zhuanlan.zhihu.com/p/274492805?utm_source=wechat_session

参考文档

Flinkx git官方:https://github.com/DTStack/flinkx/blob/1.10_release/README_CH.md

Flink 官方文档:https://ci.apache.org/projects/flink/flink-docs-stable/zh/

Flink任务调度原理之并行度与任务链:https://dragonlsl.blog.csdn.net/article/details/106005264

Flink任务调度原理之TaskManager 与Slots:https://dragonlsl.blog.csdn.net/article/details/105823127

Flink任务调度原理之逻辑数据流与执行图:https://dragonlsl.blog.csdn.net/article/details/105957694

Flinx 失败恢复:https://github.com/DTStack/flinkx/blob/1.10_release/docs/restore.md

Fink 自定义 Source 和 Sink:https://cloud.tencent.com/developer/article/1694062

Flink checkpoint 机制,分布式快照:https://zhuanlan.zhihu.com/p/104601440

Flink 如何保证 Exactly-Once:https://zhuanlan.zhihu.com/p/108570573

浅谈 Flink 分布式运行时和数据流图的并行化: https://zhuanlan.zhihu.com/p/107336730

Flink 端到端 Exactly-once:https://zhuanlan.zhihu.com/p/68797265

Flink资源管理:https://zhuanlan.zhihu.com/p/149144610

八张图搞懂 Flink 端到端精准一次处理语义 Exactly-once:https://zhuanlan.zhihu.com/p/348559815

大数据架构资料集:https://github.com/lhh2002/Framework-Of-BigData

Flink 资源分配和并行度深度剖析: https://mp.weixin.qq.com/s?__biz=MzA3MjQ1MTQzMQ==&mid=2247488786&idx=1&sn=118c92aadd0cedc8a80d671ac86d566a&chksm=9f1f411aa868c80ce2e20838c95d7b5cb673689fac521da1511a696275750d6c9ac27ea375ce&token=1486904052&lang=zh_CN#rd

深入解读 Flink 资源管理机制:https://www.jianshu.com/p/726bef539ea8

配置 TaskManager 内存:https://ci.apache.org/projects/flink/flink-docs-master/zh/docs/deployment/memory/mem_setup_tm/

TaskManager内存模型:https://zhuanlan.zhihu.com/p/340345588

一文带你彻底了解大数据处理引擎Flink内存管理: https://www.cnblogs.com/huaweiyun/p/14142780.html

Flink 原理与实现:内存管理: http://wuchong.me/blog/2016/04/29/flink-internals-memory-manage/

TaskManager 的 Slot 平分 managed memory:https://stackoverflow.com/questions/47197744/confused-about-flink-task-slot

Flink的内存模型:https://blog.csdn.net/zc19921215/article/details/106843097

配置堆内存和托管内存:https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/deployment/memory/mem_setup_tm.html#%E4%BB%BB%E5%8A%A1%E7%AE%97%E5%AD%90%E5%A0%86%E5%86%85%E5%AD%98

Flink的 slot 到底是什么:https://cloud.tencent.com/developer/article/1693328

Flink TaskManager slot计算策略:https://www.yuque.com/aitozi/blog/gv2wdv?language=en-us

Mysql 快照读:

基于 Flink SQL CDC 的实时数据同步方案:https://zhuanlan.zhihu.com/p/274492805?utm_source=wechat_session

Flink SQL CDC 13 条生产实践经验:https://blog.csdn.net/young_0609/article/details/108412891

Flink Managed Memory:

Flink集群运维:https://zhuanlan.zhihu.com/p/99755112

Flink 在监控系统上的实践和应用:https://www.infoq.cn/article/rajdtrlks*aglvjtv7ah

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-26 12:10:28  更:2021-08-26 12:10:37 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年10日历 -2025/10/24 17:57:58-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码