Flink流式SQL读取CSV文件
不给依赖的都是流氓!
<properties>
<flink.version>1.13.0</flink.version>
</properties>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</depende
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>1.13.6</version>
<scope>provided</scope>
</depende
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>1.13.6</version>
</dependency>
csv文件放在src同级目录input文件夹下,可以愉快的用sql了
package org.example.wc;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class test {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
String sql = "CREATE TABLE csv_table (\n" +
" id STRING,\n" +
" earnings DOUBLE,\n" +
" gender STRING," +
" age INT," +
" region string," +
" education INT \n" +
") WITH (\n" +
" 'connector'='filesystem',\n" +
" 'path'='input/CPSSW8.csv',\n" +
" 'format'='csv'\n" +
")";
tableEnv.executeSql(sql);
Table sqlQuery = tableEnv.sqlQuery("select id, earnings from csv_table where earnings > 25");
tableEnv.toDataStream(sqlQuery).print();
env.execute();
}
}
参考:https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/
|