IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Flink SQL 自定义 Connector -> 正文阅读

[大数据]Flink SQL 自定义 Connector

一、Flink Table 实现架构图:

在这里插入图片描述

二、Flink Table Conncetor流程解析

1、定义动态工厂类

自定义Factory继承DynamicTableSinkFactory,DynamicTableSourceFactory接口,支持读取和写入两种功能。

拿DynamicTableSourceFactory接口来说,需要实现以下几种方法:

@Override
    public DynamicTableSink createDynamicTableSource(Context context) {
        return null;
    }
 
    @Override
    public String factoryIdentifier() {
        return null;
    }
 
    @Override
    public Set<ConfigOption<?>> requiredOptions() {
        return null;
    }
 
    @Override
    public Set<ConfigOption<?>> optionalOptions() {
        return null;
    }
1.factoryIdentifier():
	指定工厂类的标识符,该标识符就是建表时必须填写的connector参数的值
2.requiredOptions()with里面必须要填写的属性配置 (比如URL,TABLE_NAME,USERNAME,PASSWORD)
3.optionalOptions(): 
	with里面非必须填写属性配置(比如提交方式,提交批次等)
4.createDynamicTableSource(): 
	创建TableSource对象,返回一个 DynamicTableSource

2、定义Connector Source/Sink类

(1)定义Connector Sink类

继承DynamicTableSink接口,实现以下方法:

	@Override
    public ChangelogMode getChangelogMode(ChangelogMode changelogMode) {
        return null;
    }
 
    @Override
    public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
        return null;
    }
 
    @Override
    public DynamicTableSink copy() {
        return null;
    }
 
    @Override
    public String asSummaryString() {
        return null;
    }
1.getChangelogMode(): 
	写入方式默认INSERT_ONLY,里面实现了一个static静态类初始化:INSERT_ONLY =newBuilder().addContainedKind(RowKind.INSERT).build();
具体实现,看下源码就知道了....
2.getSinkRuntimeProvider(): 
	Sink端输入的主要提供者,这里可以实现一个自定义的OutputFormat继承,当然也有人直接继承RichSinkFunction..这个和dataStream API实现写入方式就很像了。
RichOutputFormat<RowData>主要是写入逻辑的实现。
3.copy():
	返回一个DynamicTableSink
4.asSummaryString(): 
	接口功能总结,可以概述下这个spi主要是xxxx Sink

(2)定义Connector Source类

- Scan Table Source

ScanTableSource在运行时扫描来自外部存储系统的所有行。

实现ScanTableSource接口,实现方法如下:

	@Override
    public ChangelogMode getChangelogMode() {
     	return null;
    }

	@Override
    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
        return null;
    }
 
    @Override
    public DynamicTableSource copy() {
        return null;
    }
 
    @Override
    public String asSummaryString() {
        return null;
    }

- Lookup Table Source

LookupTableSource在运行时通过一个或多个键查找外部存储系统的行。

与ScanTableSource相比,该source不必读取整个表,并且可以在需要时从(可能不断变化的)外部表中延迟获取单个值。

与ScanTableSource相比,LookupTableSource当前仅支持发出仅插入的更改。

这里着重提一下Source端的自定义Connector,它是需要继承LookupTableSource接口,实现Lookup功能才能当做Source端使用。

	@Override
    public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext lookupContext) {
        return null;
    }
 
    @Override
    public DynamicTableSource copy() {
        return null;
    }
 
    @Override
    public String asSummaryString() {
        return null;
    }

getLookupRuntimeProvider(): 从社区已经提供的Connector中可以看到如果想实现一个Lookup功能,需要继承TableFunction接口.

(3)实现Source Function

继承RichSourceFunction类

	@Override
    public void run(SourceContext<RowData> sourceContext) throws Exception {
        
    }

    @Override
    public void cancel() {

    }
1.run():
	连接数据源,编写具体实现方式,数据返回格式
2.cancel():
	断开数据源连接

三、flink-sql-connector-mqtt 实例

添加环境依赖

<dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>1.13.6</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.13.6</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_2.11</artifactId>
            <version>1.13.6</version>
            <scope>compile</scope>
        </dependency>

        <!--mqtt连接依赖-->
        <dependency>
            <groupId>org.fusesource.mqtt-client</groupId>
            <artifactId>mqtt-client</artifactId>
            <version>1.12</version>
        </dependency>
        <!--MQTT-->
        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.0</version>
        </dependency>
    </dependencies>

1、MqttDynamicTableSourceFactory

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DeserializationFormatFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;

import java.util.HashSet;
import java.util.Set;

import static org.apache.flink.table.factories.FactoryUtil.createTableFactoryHelper;

public class MqttDynamicTableSourceFactory implements DynamicTableSourceFactory {

    @Override
    /**
     * TODO 1、创建动态表source
     * DynamicTableFactory需要具备以下功能:
     *      -定义与校验建表时传入的各项参数;
     *      -获取表的元数据;
     *      -定义读写数据时的编码/解码格式(非必需);
     *      -创建可用的DynamicTable[Source/Sink]实例。
     */
    public DynamicTableSource createDynamicTableSource(Context context) {
        //内置工具类校验传入参数
        FactoryUtil.TableFactoryHelper helper  = createTableFactoryHelper(this, context);
        helper.validate();
        // 获取有效参数
        final ReadableConfig options = helper.getOptions();

        final DecodingFormat<DeserializationSchema<RowData>> decodingFormat = helper.discoverDecodingFormat(
                DeserializationFormatFactory.class,
                FactoryUtil.FORMAT);


        // 获取元数据信息
        TableSchema schema = context.getCatalogTable().getSchema();

        // 创建并且返回一个动态表源
        return new MqttDynamicTableSource(options,decodingFormat,schema);
    }

    @Override
    //TODO 2、指定工厂类的标识符,该标识符就是建表时必须填写的connector参数的值
    public String factoryIdentifier() {
        return "mqtt";
    }

    @Override
    //TODO 3、with里面必须要填写的属性配置
    public Set<ConfigOption<?>> requiredOptions() {
        final Set<ConfigOption<?>> options = new HashSet<>();
        options.add(HOSTURL);
        options.add(TOPIC);
        options.add(FactoryUtil.FORMAT); // use pre-defined option for format
        return options;
    }

    @Override
    //TODO 4、with里面非必须填写属性配置
    public Set<ConfigOption<?>> optionalOptions() {
        final Set<ConfigOption<?>> options = new HashSet<>();
        options.add(USERNAME);
        options.add(PASSWORD);
        options.add(CLIENTID);
        return options;
    }

    //TODO 5、定义MQTT Connector需要的各项参数
    public static final ConfigOption<String> HOSTURL =
            ConfigOptions.key("hosturl")
                    .stringType()
                    .noDefaultValue()
                    .withDescription("the mqtt's connect hosturl.");

    public static final ConfigOption<String> USERNAME =
            ConfigOptions.key("username")
                    .stringType()
                    .noDefaultValue()
                    .withDescription("the mqtt's connect username.");

    public static final ConfigOption<String> PASSWORD =
            ConfigOptions.key("password")
                    .stringType()
                    .noDefaultValue()
                    .withDescription("the mqtt's connect password.");

    public static final ConfigOption<String> TOPIC =
            ConfigOptions.key("topic")
                    .stringType()
                    .noDefaultValue()
                    .withDescription("the mqtt's connect topic.");

    public static final ConfigOption<String> CLIENTID =
            ConfigOptions.key("clientid")
                    .stringType()
                    .noDefaultValue()
                    .withDescription("the mqtt's connect clientId.");
}

2、MqttDynamicTableSource

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceFunctionProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.types.RowKind;

public class MqttDynamicTableSource implements ScanTableSource {
    private ReadableConfig options;
    private TableSchema schema;
    private DecodingFormat<DeserializationSchema<RowData>> decodingFormat;
    public MqttDynamicTableSource(ReadableConfig options,DecodingFormat<DeserializationSchema<RowData>> decodingFormat, TableSchema schema){
        this.options = options;
        this.decodingFormat = decodingFormat;
        this.schema = schema;
    }

    @Override
    //写入方式默认INSERT_ONLY,里面实现了一个static静态类初始化
    public ChangelogMode getChangelogMode() {
//        return decodingFormat.getChangelogMode();
        return ChangelogMode.newBuilder()
                .addContainedKind(RowKind.INSERT)
                .addContainedKind(RowKind.UPDATE_BEFORE)
                .addContainedKind(RowKind.UPDATE_AFTER)
                .addContainedKind(RowKind.DELETE)
                .build();
    }

    @Override
    //获取运行时类
    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext ctx) {

        final DeserializationSchema<RowData> deserializer = decodingFormat.createRuntimeDecoder(
                ctx,
                schema.toPhysicalRowDataType());
        final SourceFunction<RowData> sourceFunction = new MqttSourceFunction(options,deserializer);
        return SourceFunctionProvider.of(sourceFunction, false);
    }

    @Override
    public DynamicTableSource copy() {
        return new MqttDynamicTableSource(options,decodingFormat,schema);
    }

    @Override
    public String asSummaryString() {
        return "Mqtt Table Source";
    }
}

3、MqttSourceFunction

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.table.data.RowData;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

import static org.apache.flink.table.connector.MqttDynamicTableSourceFactory.*;

public class MqttSourceFunction extends RichSourceFunction<RowData> {
    //MQTT连接配置信息
    private ReadableConfig confs;
    //阻塞队列存储订阅的消息
    private BlockingQueue<RowData> queue = new ArrayBlockingQueue<>(10);
    //存储服务
    private MqttClient client;
    //存储订阅主题
    private DeserializationSchema<RowData> deserializer;

    public MqttSourceFunction(ReadableConfig options,DeserializationSchema<RowData> deserializer) {
        this.confs = options;
        this.deserializer = deserializer;
    }

    //包装连接的方法
    private void connect() throws MqttException {
        //连接mqtt服务器
        client = new MqttClient(confs.get(HOSTURL), confs.get(CLIENTID), new MemoryPersistence());
        MqttConnectOptions options = new MqttConnectOptions();
        options.setCleanSession(false);
        options.setUserName(confs.get(USERNAME));
        options.setPassword(confs.get(PASSWORD).toCharArray());
        options.setCleanSession(false);   //是否清除session
        // 设置超时时间
        options.setConnectionTimeout(30);
        // 设置会话心跳时间
        options.setKeepAliveInterval(20);
        try {
            String[] topics = confs.get(TOPIC).split(",");
            //订阅消息
            int[] qos = new int[topics.length];
            for (int i = 0; i < topics.length; i++) {
                qos[i] = 0;
            }
            client.setCallback(new MsgCallback(client, options, topics, qos) {
            });
            client.connect(options);
            client.subscribe(topics, qos);
            System.out.println("MQTT连接成功:" + confs.get(CLIENTID) + ":" + client);
        } catch (Exception e) {
            System.out.println("MQTT连接异常:" + e);
        }
    }

    //实现MqttCallback,内部函数可回调
    class MsgCallback implements MqttCallback {
        private MqttClient client;
        private MqttConnectOptions options;
        private String[] topic;
        private int[] qos;

        public MsgCallback() {
        }

        public MsgCallback(MqttClient client, MqttConnectOptions options, String[] topic, int[] qos) {
            this.client = client;
            this.options = options;
            this.topic = topic;
            this.qos = qos;
        }

        //连接失败回调该函数
        @Override
        public void connectionLost(Throwable throwable) {
            System.out.println("MQTT连接断开,发起重连");
            while (true) {
                try {
                    Thread.sleep(1000);
                    client.connect(options);
                    //订阅消息
                    client.subscribe(topic, qos);
                    System.out.println("MQTT重新连接成功:" + client);
                    break;
                } catch (Exception e) {
                    e.printStackTrace();
                    continue;
                }
            }
        }

        //收到消息回调该函数
        @Override
        public void messageArrived(String s, MqttMessage message) throws Exception {
            System.out.println();
            //订阅消息字符
            queue.put(deserializer.deserialize(message.getPayload()));

        }

        //对象转化为字节码
        public byte[] getBytesFromObject(Serializable obj) throws Exception {
            if (obj == null) {
                return null;
            }
            ByteArrayOutputStream bo = new ByteArrayOutputStream();
            ObjectOutputStream oo = new ObjectOutputStream(bo);
            oo.writeObject(obj);
            return bo.toByteArray();
        }

        @Override
        public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {

        }
    }

    //flink线程启动函数
    @Override
    public void run(SourceContext<RowData>  ctx) throws Exception {
        connect();
        //利用死循环使得程序一直监控主题是否有新消息
        while (true) {
            //使用阻塞队列的好处是队列空的时候程序会一直阻塞到这里不会浪费CPU资源
            ctx.collect(queue.take());
        }
    }

    @Override
    public void cancel() {
        try {
            client.disconnect();
        } catch (Throwable t) {
            // ignore
        }
    }
}

4、META-INF.services

Flink SQL的自定义Connector主要是用Java的SPI方式可以加载到应用服务当中,根据SPI机制的使用机制,需要在resources目录下新建META-INF.services新建文件
org.apache.flink.table.factories.Factory
文件中添加工厂类的路径就行。
在这里插入图片描述

四、打包(fat)

Flink使用Java的Service Provider Interfaces (SPI) 机制通过特定标识符加载table的connector/format工厂。由于每个table的connector/format的名为org.apache.flink.table.factories.Factory的SPI资源文件位于同一目录:META-INF/services下,因此在构建使用多个table connector/format的项目的uber jar时,这些资源文件将相互覆盖,这将导致Flink无法正确加载工厂类。
在这种情况下,推荐的方法是通过maven shade插件的ServicesResourceTransformer转换META-INF/services目录下的这些资源文件。

pluginfunction
maven-jar-plugin maven默认打包插件,用来创建 project jar
maven-shade-plugin用来打可执行包,executable(fat) jar
maven-assembly-plugin支持定制化打包方式,例如 apache 项目的打包方式

在配置了ServicesResourceTransformer之后, 项目构建uber-jar时,META-INF/services目录下的这些资源文件会被整合在一起而不是相互覆盖。

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.1.1</version>
                <executions>
                    <execution>
                        <id>shade</id>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <transformers combine.children="append">
                                <!-- The service transformer is needed to merge META-INF/services files -->
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-07-04 22:59:49  更:2022-07-04 23:01:46 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 1:42:26-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码