Flink sql 开发
记录一些日常开发中 特殊的写法,包括查表 和创建视图。以便后续使用。
1.查询表
1.1 定义维表
CREATE TABLE dim_etf_info (
id BIGINT,
fund_code VARCHAR,
component_share INT,
creation_redemption_unit DECIMAL ( 18, 4 ),
estimate_cash_component DECIMAL ( 18, 4 ),
creation_cash_substitute DECIMAL ( 18, 4 )
) WITH ( 'connector' = 'jdbc', 'url' = '****', 'table-name' = '******', 'username' = '***', 'password' = '***' );
table api 写法:
String dimTable = "CREATE TABLE dim_etf_info (\n" +
" id BIGINT,\n" +
" fund_code VARCHAR,\n" +
" underlying_security_id VARCHAR,\n" +
" component_share INT,\n" +
" creation_redemption_unit DECIMAL(18,4),\n" +
" estimate_cash_component DECIMAL(18,4),\n" +
" creation_cash_substitute DECIMAL(18,4),\n" +
" substitute_flag VARCHAR\n" +
") WITH (\n" +
" 'connector' = 'jdbc',\n" +
")";
1.2 维表join
Table joinTable = tableEnv.sqlQuery("select proctime,prod_code,last_price,component_share,estimate_cash_component," +
"creation_redemption_unit,substitute_flag,update_time,fund_code,creation_cash_substitute " +
" from " + filterTable + " as filterTable join " + dimEtfInfo
+ " FOR SYSTEM_TIME AS OF filterTable.proctime as dim on filterTable.prod_code=dim.underlying_security_id"
+ " where fund_code=" + "'" + fundCode + "'"
SELECT
proctime,
prod_code,
last_price,
component_share,
estimate_cash_component,
creation_redemption_unit,
substitute_flag,
update_time,
fund_code,
creation_cash_substitute
FROM
filterTable AS filterTable
JOIN dimEtfInfo FOR SYSTEM_TIME AS OF filterTable.proctime AS dim ON filterTable.prod_code = dim.underlying_security_id
WHERE
fund_code = 'fundCode'
1.3 开窗基于处理时间
滚动开窗 1分钟,不涉及watermark, 计算SUM求和
SELECT
TUMBLE_START(proctime, INTERVAL '1' MINUTE),
FIRST_VALUE(fund_code) as fund_code,
FIRST_VALUE(estimate_cash_component) as estimate_cash_component,
FIRST_VALUE(creation_redemption_unit) as creation_redemption_unit,
SUM(last_price) as sum_price,
SUM(last_price*component_share) as sum_cash
IF(substitute_flag='2',SUM(creation_cash_substitute)) as sum_creation_cash_substitute,
IF(substitute_flag='1',SUM(last_price*component_share) ) as sum_cash_subscription
FROM
stockSubstitutionTable
GROUP BY TUMBLE(proctime, INTERVAL '1' MINUTE),
substitute_flag
2.查询视图
|