参考:https://github.com/lixz3321/flink-connector-jdbc-ext
下载:flink-release-1.13.2 源码 进入flink-connectors module 子module flink-connector-jdbc,添加 PhoenixDialect.java ,PhoenixRowConverter.java,修改 JdbcDialects.java
mvn clean package -DskipTests -Dfast -Dra
t.skip=true
替代 flink-connector-jdbc
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc-ext</artifactId>
<version>1.13.2</version>
</dependency>
PostgresDialect.java
org.apache.flink.connector.jdbc.dialect.PhoenixDialect
public class PhoenixDialect extends AbstractDialect{
@Override
public int maxDecimalPrecision() {
return 0;
}
@Override
public int minDecimalPrecision() {
return 0;
}
@Override
public int maxTimestampPrecision() {
return 0;
}
@Override
public int minTimestampPrecision() {
return 0;
}
@Override
public List<LogicalTypeRoot> unsupportedTypes() {
return null;
}
@Override
public String dialectName() {
return "Phoenix";
}
@Override
public boolean canHandle(String url) {
return url.startsWith("jdbc:phoenix:");
}
@Override
public JdbcRowConverter getRowConverter(RowType rowType) {
return new PhoenixRowConverter(rowType);
}
@Override
public String getLimitClause(long limit) {
return null;
}
@Override
public Optional<String> defaultDriverName() {
return Optional.of("org.apache.phoenix.jdbc.PhoenixDriver");
}
@Override
public String quoteIdentifier(String identifier) {
return "\"" + identifier + "\"";
}
@Override
public String getInsertIntoStatement(String tableName, String[] fieldNames) {
String columns =
Arrays.stream(fieldNames)
.map(this::quoteIdentifier)
.collect(Collectors.joining(", "));
String placeholders =
Arrays.stream(fieldNames).map(f -> ":" + f).collect(Collectors.joining(", "));
return "UPSERT INTO "
+ quoteIdentifier(tableName)
+ "("
+ columns
+ ")"
+ " VALUES ("
+ placeholders
+ ")";
}
}
PhoenixRowConverter.java
org.apache.flink.connector.jdbc.internal.converter.PhoenixRowConverter
public class PhoenixRowConverter extends AbstractJdbcRowConverter{
public PhoenixRowConverter(RowType rowType) {
super(rowType);
}
@Override
public String converterName() {
return "Phoenix";
}
}
JdbcDialects.java
org.apache.flink.connector.jdbc.dialect.JdbcDialects
private static final List<JdbcDialect> DIALECTS =
Arrays.asList(
new DerbyDialect(),
new MySQLDialect(),
new PostgresDialect(),
new PhoenixDialect());
======
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings);
System.out.println("tEnv--> " + tEnv);
tEnv.executeSql("create table test4(" +
"`GOODS_ID` STRING," +
"`ISWHITE` STRING," +
"`DATA_DATE` STRING" +
")WITH(" +
"'connector' = 'jdbc'," +
"'url' = 'jdbc:phoenix:172.16.34.121:2181'," +
"'table-name' = 'WHO_GOODS_LABEL'" +
")");
tEnv.executeSql("select * from test4").print();
值得注意的是如下图 aaa / ccc 未在上图显示,因为是在 hbase shell 直接添加的、未查询的到…
注意字段大小写…
create table test4(
`goods_id` STRING,
`ISWHITE` STRING,
`DATA_DATE` STRING
)WITH(
'connector' = 'jdbc',
'url' = 'jdbc:phoenix:172.16.34.121:2181',
'table-name' = 'WHO_GOODS_LABEL'
)"
|