网站怎么上百度,wordpress配置虚拟主机,知名网站设计,上海软件定制开发Over 聚合定义#xff08;⽀持 Batch\Streaming#xff09;#xff1a;**特殊的滑动窗⼝聚合函数#xff0c;拿 Over 聚合 与 窗⼝聚合 做对⽐。
窗⼝聚合#xff1a;不在 group by 中的字段#xff0c;不能直接在 select 中拿到
Over 聚合#xff1a;能够保留原始字段…Over 聚合定义⽀持 Batch\Streaming**特殊的滑动窗⼝聚合函数拿 Over 聚合 与 窗⼝聚合 做对⽐。
窗⼝聚合不在 group by 中的字段不能直接在 select 中拿到
Over 聚合能够保留原始字段
注意 ⽣产环境中Over 聚合的使⽤场景较少。
**应⽤场景**计算最近⼀段滑动窗⼝的聚合结果数据。
**实际案例**查询每个产品最近⼀⼩时订单的⾦额总和
SELECT order_id,order_time,amount,SUM(amount) OVER (PARTITION BY productORDER BY order_timeRANGE BETWEEN INTERVAL 1 HOUR PRECEDING AND CURRENT ROW) AS one_hour_prod_amount_sum
FROM OrdersOver 聚合语法如下
SELECTagg_func(agg_col) OVER ([PARTITION BY col1[, col2, ...]]ORDER BY time_colrange_definition),...
FROM ...ORDER BY必须是时间戳列事件时间、处理时间
PARTITION BY标识了聚合窗⼝的聚合粒度如上述案例是按照 product 进⾏聚合
range_definition标识聚合窗⼝的聚合数据范围在 Flink 中有两种指定数据范围的⽅式。第⼀种为 按照⾏数聚合 第⼆种为 按照时间区间聚合 。
1.时间区间聚合
**案例**输出一个产品最近⼀⼩时数据的 amount 之和。
结果就是最近⼀⼩时数据的 amount 之和。
CREATE TABLE source_table (order_id BIGINT,product BIGINT,amount BIGINT,order_time as cast(CURRENT_TIMESTAMP as TIMESTAMP(3)),WATERMARK FOR order_time AS order_time - INTERVAL 0.001 SECOND
) WITH (connector datagen,rows-per-second 1,fields.order_id.min 1,fields.order_id.max 2,fields.amount.min 1,fields.amount.max 10,fields.product.min 1,fields.product.max 2
);CREATE TABLE sink_table (product BIGINT,order_time TIMESTAMP(3),amount BIGINT,one_hour_prod_amount_sum BIGINT
) WITH (connector print
);INSERT INTO sink_table
SELECT product,order_time,amount,SUM(amount) OVER (PARTITION BY productORDER BY order_time-- 标识统计范围是⼀个 product 的最近 1 ⼩时的数据RANGE BETWEEN INTERVAL 1 HOUR PRECEDING AND CURRENT ROW) AS one_hour_prod_amount_sum
FROM source_table结果如下
I[2, 2021-12-24T22:08:26.583, 7, 73]
I[2, 2021-12-24T22:08:27.583, 7, 80]
I[2, 2021-12-24T22:08:28.583, 4, 84]
I[2, 2021-12-24T22:08:29.584, 7, 91]
I[2, 2021-12-24T22:08:30.583, 8, 99]
I[1, 2021-12-24T22:08:31.583, 9, 138]
I[2, 2021-12-24T22:08:32.584, 6, 105]
I[1, 2021-12-24T22:08:33.584, 7, 145]2.⾏数聚合
**案例**输出一个产品最近 5 ⾏数据的 amount 之和。
CREATE TABLE source_table (order_id BIGINT,product BIGINT,amount BIGINT,order_time as cast(CURRENT_TIMESTAMP as TIMESTAMP(3)),WATERMARK FOR order_time AS order_time - INTERVAL 0.001 SECOND
) WITH (connector datagen,rows-per-second 1,fields.order_id.min 1,fields.order_id.max 2,fields.amount.min 1,fields.amount.max 2,fields.product.min 1,fields.product.max 2
);CREATE TABLE sink_table (product BIGINT,order_time TIMESTAMP(3),amount BIGINT,one_hour_prod_amount_sum BIGINT
) WITH (connector print
);INSERT INTO sink_table
SELECT product,order_time,amount,SUM(amount) OVER (PARTITION BY productORDER BY order_time-- 标识统计范围是⼀个 product 的最近 5 ⾏数据ROWS BETWEEN 5 PRECEDING AND CURRENT ROW) AS one_hour_prod_amount_sum
FROM source_table结果如下
I[2, 2021-12-24T22:18:19.147, 1, 9]
I[1, 2021-12-24T22:18:20.147, 2, 11]
I[1, 2021-12-24T22:18:21.147, 2, 12]
I[1, 2021-12-24T22:18:22.147, 2, 12]
I[1, 2021-12-24T22:18:23.148, 2, 12]
I[1, 2021-12-24T22:18:24.147, 1, 11]
I[1, 2021-12-24T22:18:25.146, 1, 10]
I[1, 2021-12-24T22:18:26.147, 1, 9]
I[2, 2021-12-24T22:18:27.145, 2, 11]
I[2, 2021-12-24T22:18:28.148, 1, 10]
I[2, 2021-12-24T22:18:29.145, 2, 10]在⼀个 SELECT 中有多个聚合窗⼝简化写法如下
SELECT order_id,order_time,amount,SUM(amount) OVER w AS sum_amount,AVG(amount) OVER w AS avg_amount
FROM Orders
-- 使⽤下⾯⼦句定义 Over Window
WINDOW w AS (PARTITION BY productORDER BY order_timeRANGE BETWEEN INTERVAL 1 HOUR PRECEDING AND CURRENT ROW)