public class Kafka2Hive {
public static void main(String[] args) {
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().build();
environment.enableCheckpointing(10000L);
environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(environment, settings);
String name = "myhive";
String defaultDatabase = "mydatabase";
String hiveConfDir = "F:\\flink-demo\\src\\main\\resources";
HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
tableEnv.registerCatalog("myhive", hive);
tableEnv.useCatalog("myhive");
tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
tableEnv.executeSql("INSERT INTO hive_table " +
"SELECT user_id, order_amount, DATE_FORMAT(ts, 'yyyy-MM-dd'), DATE_FORMAT(ts, 'HH')" +
"FROM kafka_table");
}
}
hive-site.xml 添加以下配置
<property>
<name>hive.metastore.uris</name>
<value>thrift://hadoop01:9083</value>
</property>
启动 hive --service metastore
测试数据
1,10,1625886660000
2,3,1625886721000
1,5,1625887380000
1,6,1625887800000
2,30,1625886721000
1,4,1625889989000
|