spark数据源为ots 或者叫tablestore
DROP TABLE IF EXISTS source;
CREATE TABLE group_chat_source (
targetId STRING,targetType LONG, sendTime LONG COMMENT "group_chat_source"
)
USING tablestore
OPTIONS(
endpoint='${source_endpoint}',
access.key.id='${source_access_key}',
access.key.secret='${source_access_secret}',
instance.name='${source_instance}',
table.name="test_sou_yuntusuo_index_chat",
search.index.name="test_sou_yuntusuo_index_chat_index",
max.split.count=64,
push.down.range.long = true,
push.down.range.string = false
);
DROP TABLE IF EXISTS dim;
CREATE TABLE group_info_dim (
id_md5 STRING,targetName STRING COMMENT "table"
)
USING tablestore
OPTIONS(
endpoint='${dim_endpoint}',
access.key.id='${dim_access_key}',
access.key.secret='${dim_access_secret}',
instance.name='${dim_instance}',
table.name="xxx",
search.index.name="xxx_index",
max.split.count=64,
push.down.range.long = true,
push.down.range.string = false
);
DROP TABLE IF EXISTS sink;
CREATE TABLE group_chat_sink
USING tablestore
OPTIONS(
endpoint='${sink_endpoint}',
access.key.id='${sink_access_key}',
access.key.secret='${sink_access_secret}',
instance.name='${sink_instance}',
table.name="xxx1",
catalog='{"columns": {
"id_md5": {"col": "id_md5", "type": "string"},
"targetId": {"col": "targetId", "type": "string"},
"targetName": {"col": "targetName", "type": "string"},
"count": {"col": "count", "type": "long"},
"last": {"col": "last", "type": "long"},
"date": {"col": "date", "type": "string"}}}'
);
insert into sink
select
uuid() as id_md5,
targetId,
targetName as targetName,
count(*) as count,
max(sendTime) as last,
STRING(${start_time}) as date
from source
LEFT JOIN dim ON source.targetId=group_info_dim.id_md5
where targetType=3 and sendTime>unix_timestamp(STRING(${start_time}), 'yyyyMMdd')*1000 and sendTime<unix_timestamp(STRING(${end_time}), 'yyyyMMdd')*1000
group by targetId,targetName;
|