需求分析与思路:
关键词主题这个主要是为了大屏展示中的字符云的展示效果,用于感性的让大屏观看者感知目前的用户都更关心的那些商品和关键词。 关键词的展示也是一种维度聚合的结果,根据聚合的大小来决定关键词的大小。 关键词的第一重要来源的就是用户在搜索栏的搜索,另外就是从以商品为主题的统计中获取关键词
IK 分词器的使用
因为无论是从用户的搜索栏中,还是从商品名称中文字都是可能是比较长的,且由多个关键词组成 所以我们需要根据把长文本分割成一个一个的词,这种分词技术,在搜索引擎中可能会用到。对于中文分词,现在的搜索引擎基本上都是使用的第三方分词器,咱们在计算数据中也可以使用和搜索引擎中一致的分词器,IK。
public class KeywordStatsApp {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
String groupId = "keyword_stats_app";
String pageViewSourceTopic = "dwd_page_log";
tableEnv.executeSql("create table page_view( " +
" `common` Map<STRING,STRING>, " +
" `page` Map<STRING,STRING>, " +
" `ts` BIGINT, " +
" `rt` as TO_TIMESTAMP(FROM_UNIXTIME(ts/1000)), " +
" WATERMARK FOR rt AS rt - INTERVAL '1' SECOND " +
") with (" + MyKafkaUtil.getKafkaDDL(pageViewSourceTopic, groupId) + ")");
Table fullWordTable = tableEnv.sqlQuery("" +
"select " +
" page['item'] full_word, " +
" rt " +
"from " +
" page_view " +
"where " +
" page['last_page_id']='search' and page['item'] is not null");
tableEnv.createTemporarySystemFunction("split_words", SplitFunction.class);
Table wordTable = tableEnv.sqlQuery("" +
"SELECT " +
" word, " +
" rt " +
"FROM " +
" " + fullWordTable + ", LATERAL TABLE(split_words(full_word))");
Table resultTable = tableEnv.sqlQuery("" +
"select " +
" 'search' source, " +
" DATE_FORMAT(TUMBLE_START(rt, INTERVAL '10' SECOND), 'yyyy-MM-dd HH:mm:ss') stt, " +
" DATE_FORMAT(TUMBLE_END(rt, INTERVAL '10' SECOND), 'yyyy-MM-dd HH:mm:ss') edt, " +
" word keyword, " +
" count(*) ct, " +
" UNIX_TIMESTAMP()*1000 ts " +
"from " + wordTable + " " +
"group by " +
" word, " +
" TUMBLE(rt, INTERVAL '10' SECOND)");
DataStream<KeywordStats> keywordStatsDataStream = tableEnv.toAppendStream(resultTable, KeywordStats.class);
keywordStatsDataStream.print();
keywordStatsDataStream.addSink(ClickHouseUtil.getSink("insert into keyword_stats_210325(keyword,ct,source,stt,edt,ts) values(?,?,?,?,?,?)"));
env.execute("KeywordStatsApp");
}
}
代码流程图:
|