1- 背景信息
以下案例是实时计算的合作伙伴袋鼠云通过阿里云实时计算来完成电商订单管理的案例。
2- 业务架构图
业务流程:
- 使用数据传输服务DTS把您的数据同步到大数据总线(DataHub)。
- 阿里云实时计算订阅大数据总线(DataHub)的数据进行实时计算。
- 将实时数据插入到RDS的云数据库。
- 通过阿里云的DataV或者是其他的大屏完成数据展示。
3- 准备工作
将RDS MySQL产生的增量数据实时同步到DataHub中的Topic。由RDS经过DTS数据同步到大数据总线(DataHub)Schema表的信息。
表 1. orders_real源表 表 2. orderdetail_real源表
4- 编写业务逻辑
create table orders_real(
dts_ordercodeofsys varchar,
dts_paytime bigint,
dts_deliveredtime varchar,
dts_storecode varchar,
dts_warehousecode varchar,
dts_cancelled bigint,
dts_delivered bigint,
dts_receivercity varchar,
dts_receiverprovince varchar,
dts_record_id varchar,
dts_operation_flag varchar,
dts_instance_id varchar,
dts_db_name varchar,
dts_table_name varchar,
dts_utc_timestamp varchar,
dts_before_flag varchar,
dts_after_flag varchar
) with (
type='datahub',
endPoint='http://dh-cn-****.com',
project='your',
topic='表名',
accessId='您的ID',
accessKey='您的KEY'
);
create table orderdetail_real(
dts_ordercodeofsys varchar,
dts_skuname varchar,
dts_skucode varchar,
dts_quantity bigint,
dts_dividedamount double,
dts_salechanneldividedamount double,
dts_initialcost double,
dts_record_id varchar,
dts_operation_flag varchar,
dts_instance_id varchar,
dts_db_name varchar,
dts_table_name varchar,
dts_utc_timestamp varchar,
dts_before_flag varchar,
dts_after_flag varchar
) with (
type='datahub',
endPoint='http://dh-cn-****.com',
project='yourPorjectName',
topic='yourTableName',
accessId='yourAccessId',
accessKey='yourAccessSecret'
);
create table ads_all_count_amount(
bill_date varchar,
bill_count bigint,
qty bigint,
primary key (bill_date)
) with (
type='rds',
url='jdbc:mysql://rm-XXXX.mysql.rds.aXXXXcs.com:3306/XXXX',
tableName='yourDatabaseTableName',
userName='yourDatabaseAccount',
password='yourDatabasePassword'
);
CREATE VIEW new_paytime AS
SELECT
dts_ordercodeofsys,
MAX(dts_paytime) AS dts_paytime
FROM orders_real
GROUP BY dts_ordercodeofsys ;
CREATE VIEW new_orderdetail AS
SELECT
dts_ordercodeofsys,
dts_skuname,
dts_skucode,
CASE WHEN dts_operation_flag='U'
AND dts_before_flag='Y'
AND dts_after_flag='N' THEN -1*dts_quantity
WHEN dts_operation_flag='U'
AND dts_before_flag='N'
AND dts_after_flag='Y' THEN dts_quantity
WHEN dts_operation_flag='D' THEN -1*dts_quantity
ELSE dts_quantity
END AS dts_quantity
FROM
orderdetail_real;
INSERT INTO ads_all_count_amount
SELECT
FROM_UNIXTIME(cast(a.dts_paytime/1000000 AS bigint),'yyyyMMdd') AS bill_date,
COUNT(DISTINCT a.dts_ordercodeofsys) AS bill_count,
SUM(b.dts_quantity) AS qty
from
new_paytime a
join
new_orderdetail b
ON a.dts_ordercodeofsys=b.dts_ordercodeofsys
GROUP BY
FROM_UNIXTIME(CAST(a.dts_paytime/1000000 AS bigint),'yyyyMMdd');
5- 难点解析
为了方便您理解结构化代码和代码维护,推荐使用View(数据视图概念)将业务逻辑拆分为三个模块。
- 模块一 :根据订单编号进行分组
同一个编号订单会有多次业务操作(例如下单、付款、发货),并在Binlog日志中形成多条同一订单编号的订单流水记录。使用MAX(dts_paytime)获取同一编号的最后一次操作数据库最终付款交易时间。
CREATE VIEW new_paytime AS
SELECT
dts_ordercodeofsys,
MAX(dts_paytime) AS dts_paytime
FROM orders_real
GROUP BY dts_ordercodeofsys;
CREATE VIEW new_orderdetail AS
SELECT
dts_ordercodeofsys,
dts_skuname,
dts_skucode,
CASE WHEN dts_operation_flag='U'
AND dts_before_flag='Y'
AND dts_after_flag='N' THEN -1*dts_quantity
WHEN dts_operation_flag='U'
AND dts_before_flag='N'
AND dts_after_flag='Y' THEN dts_quantity
WHEN dts_operation_flag='D' THEN -1*dts_quantity
ELSE dts_quantity
END AS dts_quantity
FROM orderdetail_real;
数据库日志会获取所有的数据记录的变更,而每个订单是有状态的,如下表所示。
对于不同的操作类型,增量日志中的dts_before_flag和dts_after_flag定义如下:
- 操作类型为Insert
所有Column值为新插入的记录值,即为更新后的值,所以dts_before_flag=N,dts_after_flag=Y。 - 操作类型为Update
Update操作被拆为2条增量日志。这两条增量日志的dts_record_id,dts_operation_flag及dts_utc_timestamp相同。第一条日志记录更新前的值,所以dts_before_flag=Y,dts_after_flag=N。第二条日志记录了更新后的值,所以dts_before_flag=N,dts_after_flag=Y。
- 操作类型为Delete
所有Column值为被删除的记录值,即为更新前的值。所以dts_before_flag=Y,dts_after_flag=N。
SELECT
from_unixtime(CAST(a.dts_paytime/1000000 AS bigint),'yyyyMMdd') AS bill_date,
COUNT(DISTINCTa.dts_ordercodeofsys) AS bill_count,
SUM(b.dts_quantity) AS qty
from
new_paytime as a
join
new_orderdetail as b
ON
a.dts_ordercodeofsys=b.dts_ordercodeofsys
GROUP BY
from_unixtime(CAST(a.dts_paytime/1000000 AS bigint),'yyyyMMdd');
6- DEMO示例以及源代码
根据上文介绍的订单与销量统计解决方案,为您创建了一个包含完整链路的DEMO示例,如下所示: DataHub作为源表。 RDS作为结果表。
7- 常见问题
Q:模块二中,如何判断有效交易订单? A:首先是要满足dts_operation_flag=U或者dts_operation_flag=I,然后dts_before_flag代表的是变更前订单状态,dts_after_flag是变更后订单状态。所以有效交易订单如下。
dts_operation_flag='U'
AND dts_before_flag='N'
AND dts_after_flag='Y' THEN dts_quantity
Q:模块二中,为什么THEN -1*dts_quantity? A:订单的取消或者是交易没有成功,在总的销量里也会记录。为了保证总销量的正确性,所以把没有成交的订单数量设为负数,在计算总的销量会减去这个数量。
Q:模块三中,为什么订单源表和订单详情需要进行JOIN操作? A:new_paytime查出的是最新交易的时间的所有的订单编号。new_orderdetail查询的是所有有效订单的订单编码、商品名称、商品编号、数量的信息。两张表JOIN是为了方便用户来统计订单总数和总的销量。
|