Top-N
Batch Streaming
Top-N queries ask for the N smallest or largest values ordered by columns. Both smallest and largest values sets are considered Top-N queries. Top-N queries are useful in cases where the need is to display only the N bottom-most or the N top- most records from batch/streaming table on a condition. This result set can be used for further analysis. Top-N查询要求按列排序的N个最小或最大值。最小值集和最大值集都被视为Top-N查询。如果需要在条件下只显示批/流表中的N条最下面或N条最上面的记录,则Top-N查询非常有用。该结果集可用于进一步分析。
Flink uses the combination of a OVER window clause and a filter condition to express a Top-N query. With the power of OVER window PARTITION BY clause, Flink also supports per group Top-N. For example, the top five products per category that have the maximum sales in realtime. Top-N queries are supported for SQL on batch and streaming tables. Flink使用OVER窗口子句和筛选条件的组合来表示Top-N查询。凭借OVER窗口PARTITION BY子句的强大功能,Flink还支持每个组的Top-N。例如,实时销售最高的每个类别前五名产品。批表和流表上的SQL都支持Top-N查询。
The following shows the syntax of the Top-N statement: 下面显示了Top-N语句的语法:
SELECT [column_list]
FROM (
SELECT [column_list],
ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
FROM table_name)
WHERE rownum <= N [AND conditions]
Parameter Specification: 参数规格:
- ROW_NUMBER(): Assigns an unique, sequential number to each row, starting with one, according to the ordering of rows within the partition. Currently, we only support ROW_NUMBER as the over window function. In the future, we will support RANK() and DENSE_RANK().
ROW_NUMBER():根据分区内行的顺序,为每一行分配一个唯一的序列号,从1开始。目前,我们只支持ROW_NUMBER作为窗口函数。未来,我们将支持RANK()和DENSE_RANK()。 - PARTITION BY col1[, col2…]: Specifies the partition columns. Each partition will have a Top-N result.
PARTITION BY col1[, col2…]:指定分区列。每个分区将有一个Top-N结果。 - ORDER BY col1 [asc|desc][, col2 [asc|desc]…]: Specifies the ordering columns. The ordering directions can be different on different columns.
ORDER BY col1 [asc|desc][, col2 [asc|desc]…]:指定排序列。不同列的排序方向可能不同。 - WHERE rownum <= N: The rownum <= N is required for Flink to recognize this query is a Top-N query. The N represents the N smallest or largest records will be retained.
WHERE rownum <= N:Flink需要rownum<=N才能识别此查询是Top-N查询。N表示将保留的N个最小或最大记录。 - [AND conditions]: It is free to add other conditions in the where clause, but the other conditions can only be combined with rownum <= N using AND conjunction.
[AND conditions]:可以在where子句中自由添加其他条件,但其他条件只能使用AND连接与rownum <= N组合。
Note: the above pattern must be followed exactly, otherwise the optimizer won’t be able to translate the query. The TopN query is Result Updating. Flink SQL will sort the input data stream according to the order key, so if the top N records have been changed, the changed ones will be sent as retraction/update records to downstream. It is recommended to use a storage which supports updating as the sink of Top-N query. In addition, if the top N records need to be stored in external storage, the result table should have the same unique key with the Top-N query. 注意:必须严格遵循上述模式,否则优化器将无法转换查询。 TopN查询是结果更新。Flink SQL将根据顺序key对输入数据流进行排序,因此如果前N条记录已更改,则更改后的记录将作为retraction/update记录发送到下游。建议使用支持更新的存储作为Top-N查询的接收器。此外,如果前N条记录需要存储在外部存储中,结果表应该与Top-N查询具有相同的唯一键。
The unique keys of Top-N query is the combination of partition columns and rownum column. Top-N query can also derive the unique key of upstream. Take following job as an example, say product_id is the unique key of the ShopSales, then the unique keys of the Top-N query are [category, rownum] and [product_id]. Top-N查询的唯一键是分区列和行数列的组合。Top-N查询还可以导出上游的唯一key。以下面的作业为例,假设product_id是ShopSales的唯一键,那么Top-N查询的唯一键是[category,rownum]和[product_id]。
The following examples show how to specify SQL queries with Top-N on streaming tables. This is an example to get “the top five products per category that have the maximum sales in realtime” we mentioned above. 下面的示例显示了如何在流表上使用Top-N指定SQL查询。这是一个例子,可以获得我们上面提到的“实时销售最高的每个类别前五名产品”。
CREATE TABLE ShopSales (
product_id STRING,
category STRING,
product_name STRING,
sales BIGINT
) WITH (...);
SELECT *
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY category ORDER BY sales DESC) AS row_num
FROM ShopSales)
WHERE row_num <= 5
No Ranking Output Optimization
As described above, the rownum field will be written into the result table as one field of the unique key, which may lead to a lot of records being written to the result table. For example, when the record (say product-1001) of ranking 9 is updated and its rank is upgraded to 1, all the records from ranking 1 ~ 9 will be output to the result table as update messages. If the result table receives too many data, it will become the bottleneck of the SQL job. 如上所述,rownum字段将作为唯一键的一个字段写入结果表,这可能会导致许多记录被写入结果表。例如,当排名9的记录(例如product-1001)被更新并且其排名被升级为1时,排名1~9的所有记录都将作为更新消息输出到结果表中。如果结果表接收的数据太多,它将成为SQL作业的瓶颈。
The optimization way is omitting rownum field in the outer SELECT clause of the Top-N query. This is reasonable because the number of the top N records is usually not large, thus the consumers can sort the records themselves quickly. Without rownum field, in the example above, only the changed record (product-1001) needs to be sent to downstream, which can reduce much IO to the result table. 优化方法是在Top-N查询的外部SELECT子句中省略rownum字段。这是合理的,因为前N条记录的数量通常不多,因此消费者可以自己对记录进行快速排序。在上面的示例中,如果没有rownum字段,则只需要将更改后的记录(product-1001)发送到下游,这可以减少对结果表的大量IO。
The following example shows how to optimize the above Top-N example in this way: 下面的示例显示了如何以这种方式优化上面的Top-N示例:
CREATE TABLE ShopSales (
product_id STRING,
category STRING,
product_name STRING,
sales BIGINT
) WITH (...);
SELECT product_id, category, product_name, sales
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY category ORDER BY sales DESC) AS row_num
FROM ShopSales)
WHERE row_num <= 5
Attention in Streaming Mode In order to output the above query to an external storage and have a correct result, the external storage must have the same unique key with the Top-N query. In the above example query, if the product_id is the unique key of the query, then the external table should also has product_id as the unique key. 流模式下需要注意:为了将上述查询输出到外部存储并获得正确的结果,外部存储必须具有与Top-N查询相同的唯一key。在上面的示例查询中,如果product_id是查询的唯一键,那么外部表也应该将product_id作为唯一键。
|