一、Flink Table 实现架构图:
二、Flink Table Conncetor流程解析
1、定义动态工厂类
自定义Factory继承DynamicTableSinkFactory,DynamicTableSourceFactory接口,支持读取和写入两种功能。
拿DynamicTableSourceFactory接口来说,需要实现以下几种方法:
@Override
public DynamicTableSink createDynamicTableSource(Context context) {
return null;
}
@Override
public String factoryIdentifier() {
return null;
}
@Override
public Set<ConfigOption<?>> requiredOptions() {
return null;
}
@Override
public Set<ConfigOption<?>> optionalOptions() {
return null;
}
1.factoryIdentifier():
指定工厂类的标识符,该标识符就是建表时必须填写的connector参数的值
2.requiredOptions():
with里面必须要填写的属性配置 (比如URL,TABLE_NAME,USERNAME,PASSWORD)
3.optionalOptions():
with里面非必须填写属性配置(比如提交方式,提交批次等)
4.createDynamicTableSource():
创建TableSource对象,返回一个 DynamicTableSource
2、定义Connector Source/Sink类
(1)定义Connector Sink类
继承DynamicTableSink接口,实现以下方法:
@Override
public ChangelogMode getChangelogMode(ChangelogMode changelogMode) {
return null;
}
@Override
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
return null;
}
@Override
public DynamicTableSink copy() {
return null;
}
@Override
public String asSummaryString() {
return null;
}
1.getChangelogMode():
写入方式默认INSERT_ONLY,里面实现了一个static静态类初始化:INSERT_ONLY =newBuilder().addContainedKind(RowKind.INSERT).build();
具体实现,看下源码就知道了....
2.getSinkRuntimeProvider():
Sink端输入的主要提供者,这里可以实现一个自定义的OutputFormat继承,当然也有人直接继承RichSinkFunction..这个和dataStream API实现写入方式就很像了。
RichOutputFormat<RowData>主要是写入逻辑的实现。
3.copy():
返回一个DynamicTableSink
4.asSummaryString():
接口功能总结,可以概述下这个spi主要是xxxx Sink
(2)定义Connector Source类
- Scan Table Source
ScanTableSource在运行时扫描来自外部存储系统的所有行。
实现ScanTableSource接口,实现方法如下:
@Override
public ChangelogMode getChangelogMode() {
return null;
}
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
return null;
}
@Override
public DynamicTableSource copy() {
return null;
}
@Override
public String asSummaryString() {
return null;
}
- Lookup Table Source
LookupTableSource在运行时通过一个或多个键查找外部存储系统的行。
与ScanTableSource相比,该source不必读取整个表,并且可以在需要时从(可能不断变化的)外部表中延迟获取单个值。
与ScanTableSource相比,LookupTableSource当前仅支持发出仅插入的更改。
这里着重提一下Source端的自定义Connector,它是需要继承LookupTableSource接口,实现Lookup功能才能当做Source端使用。
@Override
public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext lookupContext) {
return null;
}
@Override
public DynamicTableSource copy() {
return null;
}
@Override
public String asSummaryString() {
return null;
}
getLookupRuntimeProvider(): 从社区已经提供的Connector中可以看到如果想实现一个Lookup功能,需要继承TableFunction接口.
(3)实现Source Function
继承RichSourceFunction类
@Override
public void run(SourceContext<RowData> sourceContext) throws Exception {
}
@Override
public void cancel() {
}
1.run():
连接数据源,编写具体实现方式,数据返回格式
2.cancel():
断开数据源连接
三、flink-sql-connector-mqtt 实例
添加环境依赖
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.13.6</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.13.6</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>1.13.6</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.fusesource.mqtt-client</groupId>
<artifactId>mqtt-client</artifactId>
<version>1.12</version>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.0</version>
</dependency>
</dependencies>
1、MqttDynamicTableSourceFactory
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DeserializationFormatFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import java.util.HashSet;
import java.util.Set;
import static org.apache.flink.table.factories.FactoryUtil.createTableFactoryHelper;
public class MqttDynamicTableSourceFactory implements DynamicTableSourceFactory {
@Override
public DynamicTableSource createDynamicTableSource(Context context) {
FactoryUtil.TableFactoryHelper helper = createTableFactoryHelper(this, context);
helper.validate();
final ReadableConfig options = helper.getOptions();
final DecodingFormat<DeserializationSchema<RowData>> decodingFormat = helper.discoverDecodingFormat(
DeserializationFormatFactory.class,
FactoryUtil.FORMAT);
TableSchema schema = context.getCatalogTable().getSchema();
return new MqttDynamicTableSource(options,decodingFormat,schema);
}
@Override
public String factoryIdentifier() {
return "mqtt";
}
@Override
public Set<ConfigOption<?>> requiredOptions() {
final Set<ConfigOption<?>> options = new HashSet<>();
options.add(HOSTURL);
options.add(TOPIC);
options.add(FactoryUtil.FORMAT);
return options;
}
@Override
public Set<ConfigOption<?>> optionalOptions() {
final Set<ConfigOption<?>> options = new HashSet<>();
options.add(USERNAME);
options.add(PASSWORD);
options.add(CLIENTID);
return options;
}
public static final ConfigOption<String> HOSTURL =
ConfigOptions.key("hosturl")
.stringType()
.noDefaultValue()
.withDescription("the mqtt's connect hosturl.");
public static final ConfigOption<String> USERNAME =
ConfigOptions.key("username")
.stringType()
.noDefaultValue()
.withDescription("the mqtt's connect username.");
public static final ConfigOption<String> PASSWORD =
ConfigOptions.key("password")
.stringType()
.noDefaultValue()
.withDescription("the mqtt's connect password.");
public static final ConfigOption<String> TOPIC =
ConfigOptions.key("topic")
.stringType()
.noDefaultValue()
.withDescription("the mqtt's connect topic.");
public static final ConfigOption<String> CLIENTID =
ConfigOptions.key("clientid")
.stringType()
.noDefaultValue()
.withDescription("the mqtt's connect clientId.");
}
2、MqttDynamicTableSource
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceFunctionProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.types.RowKind;
public class MqttDynamicTableSource implements ScanTableSource {
private ReadableConfig options;
private TableSchema schema;
private DecodingFormat<DeserializationSchema<RowData>> decodingFormat;
public MqttDynamicTableSource(ReadableConfig options,DecodingFormat<DeserializationSchema<RowData>> decodingFormat, TableSchema schema){
this.options = options;
this.decodingFormat = decodingFormat;
this.schema = schema;
}
@Override
public ChangelogMode getChangelogMode() {
return ChangelogMode.newBuilder()
.addContainedKind(RowKind.INSERT)
.addContainedKind(RowKind.UPDATE_BEFORE)
.addContainedKind(RowKind.UPDATE_AFTER)
.addContainedKind(RowKind.DELETE)
.build();
}
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext ctx) {
final DeserializationSchema<RowData> deserializer = decodingFormat.createRuntimeDecoder(
ctx,
schema.toPhysicalRowDataType());
final SourceFunction<RowData> sourceFunction = new MqttSourceFunction(options,deserializer);
return SourceFunctionProvider.of(sourceFunction, false);
}
@Override
public DynamicTableSource copy() {
return new MqttDynamicTableSource(options,decodingFormat,schema);
}
@Override
public String asSummaryString() {
return "Mqtt Table Source";
}
}
3、MqttSourceFunction
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.table.data.RowData;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import static org.apache.flink.table.connector.MqttDynamicTableSourceFactory.*;
public class MqttSourceFunction extends RichSourceFunction<RowData> {
private ReadableConfig confs;
private BlockingQueue<RowData> queue = new ArrayBlockingQueue<>(10);
private MqttClient client;
private DeserializationSchema<RowData> deserializer;
public MqttSourceFunction(ReadableConfig options,DeserializationSchema<RowData> deserializer) {
this.confs = options;
this.deserializer = deserializer;
}
private void connect() throws MqttException {
client = new MqttClient(confs.get(HOSTURL), confs.get(CLIENTID), new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(false);
options.setUserName(confs.get(USERNAME));
options.setPassword(confs.get(PASSWORD).toCharArray());
options.setCleanSession(false);
options.setConnectionTimeout(30);
options.setKeepAliveInterval(20);
try {
String[] topics = confs.get(TOPIC).split(",");
int[] qos = new int[topics.length];
for (int i = 0; i < topics.length; i++) {
qos[i] = 0;
}
client.setCallback(new MsgCallback(client, options, topics, qos) {
});
client.connect(options);
client.subscribe(topics, qos);
System.out.println("MQTT连接成功:" + confs.get(CLIENTID) + ":" + client);
} catch (Exception e) {
System.out.println("MQTT连接异常:" + e);
}
}
class MsgCallback implements MqttCallback {
private MqttClient client;
private MqttConnectOptions options;
private String[] topic;
private int[] qos;
public MsgCallback() {
}
public MsgCallback(MqttClient client, MqttConnectOptions options, String[] topic, int[] qos) {
this.client = client;
this.options = options;
this.topic = topic;
this.qos = qos;
}
@Override
public void connectionLost(Throwable throwable) {
System.out.println("MQTT连接断开,发起重连");
while (true) {
try {
Thread.sleep(1000);
client.connect(options);
client.subscribe(topic, qos);
System.out.println("MQTT重新连接成功:" + client);
break;
} catch (Exception e) {
e.printStackTrace();
continue;
}
}
}
@Override
public void messageArrived(String s, MqttMessage message) throws Exception {
System.out.println();
queue.put(deserializer.deserialize(message.getPayload()));
}
public byte[] getBytesFromObject(Serializable obj) throws Exception {
if (obj == null) {
return null;
}
ByteArrayOutputStream bo = new ByteArrayOutputStream();
ObjectOutputStream oo = new ObjectOutputStream(bo);
oo.writeObject(obj);
return bo.toByteArray();
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
}
}
@Override
public void run(SourceContext<RowData> ctx) throws Exception {
connect();
while (true) {
ctx.collect(queue.take());
}
}
@Override
public void cancel() {
try {
client.disconnect();
} catch (Throwable t) {
}
}
}
4、META-INF.services
Flink SQL的自定义Connector主要是用Java的SPI方式可以加载到应用服务当中,根据SPI机制的使用机制,需要在resources目录下新建META-INF.services新建文件 org.apache.flink.table.factories.Factory 文件中添加工厂类的路径就行。
四、打包(fat)
Flink使用Java的Service Provider Interfaces (SPI) 机制通过特定标识符加载table的connector/format工厂。由于每个table的connector/format的名为org.apache.flink.table.factories.Factory的SPI资源文件位于同一目录:META-INF/services下,因此在构建使用多个table connector/format的项目的uber jar时,这些资源文件将相互覆盖,这将导致Flink无法正确加载工厂类。 在这种情况下,推荐的方法是通过maven shade插件的ServicesResourceTransformer转换META-INF/services目录下的这些资源文件。
plugin | function |
---|
maven-jar-plugin maven | 默认打包插件,用来创建 project jar | maven-shade-plugin | 用来打可执行包,executable(fat) jar | maven-assembly-plugin | 支持定制化打包方式,例如 apache 项目的打包方式 |
在配置了ServicesResourceTransformer之后, 项目构建uber-jar时,META-INF/services目录下的这些资源文件会被整合在一起而不是相互覆盖。
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<id>shade</id>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers combine.children="append">
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
|