当前位置: 首页 > news >正文

可以做分销的淘客网站企业微信后台管理系统

可以做分销的淘客网站,企业微信后台管理系统,做网站文章要一篇一篇的写吗,电商购物app定制开发一、说明 时间属性是大数据中的一个重要方面#xff0c;像窗口#xff08;在 Table API 和 SQL #xff09;这种基于时间的操作#xff0c;需要有时间信息。我们可以通过时间属性来更加灵活高效地处理数据#xff0c;下面我们通过处理时间和事件时间来探讨一下Flink SQL …一、说明 时间属性是大数据中的一个重要方面像窗口在 Table API 和 SQL 这种基于时间的操作需要有时间信息。我们可以通过时间属性来更加灵活高效地处理数据下面我们通过处理时间和事件时间来探讨一下Flink SQL 时间属性。 二、处理时间 2.1、准备WaterSensor类方便使用 package com.lyh.bean;import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor;Data NoArgsConstructor AllArgsConstructor public class WaterSensor {private String id;private Long ts;private Integer vc; }2.2、DataStream 到 Table 转换时定义 处理时间属性可以在 schema 定义的时候用 .proctime 后缀来定义。时间属性一定不能定义在一个已有字段上所以它新增一个字段。 代码段 package com.lyh.flink12;import com.lyh.bean.WaterSensor; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import static org.apache.flink.table.api.Expressions.$;public class Flink_Sql_Proctime {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSourceWaterSensor waterSensorStream env.fromElements(new WaterSensor(sensor_1, 1000L, 10),new WaterSensor(sensor_1, 2000L, 20),new WaterSensor(sensor_2, 3000L, 30),new WaterSensor(sensor_1, 4000L, 40),new WaterSensor(sensor_1, 5000L, 50),new WaterSensor(sensor_2, 6000L, 60)); // 1. 创建表的执行环境StreamTableEnvironment tableEnv StreamTableEnvironment.create(env); // 声明一个额外的字段来作为处理时间字段Table sensorTable tableEnv.fromDataStream(waterSensorStream, $(id), $(ts), $(vc), $(pt).proctime());sensorTable.execute().print();} }执行结果 2.3、创建数据文件sensor.txt 数据方便使用 sensor_1,1,10 sensor_1,2,20 sensor_2,4,30 sensor_1,4,400 sensor_2,5,50 sensor_2,6,602.4、在创建表的 DDL 中定义 package com.lyh.flink12;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class Flink_Sql_ddl_Procetime {public static void main(String[] args) {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv StreamTableEnvironment.create(env);tableEnv.executeSql(create table sensor(id string,ts bigint,vc int,pt_time as PROCTIME()) with( connector filesystem, path input/sensor.txt, format csv ));Table table tableEnv.sqlQuery(select * from sensor);table.execute().print();} }运行结果 三、事件时间 事件时间允许程序按照数据中包含的时间来处理这样可以在有乱序或者晚到的数据的情况下产生一致的处理结果。它可以保证从外部存储读取数据后产生可以复现replayable的结果。 除此之外事件时间可以让程序在流式和批式作业中使用同样的语法。在流式程序中的事件时间属性在批式程序中就是一个正常的时间字段。 为了能够处理乱序的事件并且区分正常到达和晚到的事件Flink 需要从事件中获取事件时间并且产生 watermarkwatermarks。 3.1、DataStream 到 Table 转换时定义 事件时间属性可以用 .rowtime 后缀在定义 DataStream schema 的时候来定义。时间戳和 watermark 在这之前一定是在 DataStream 上已经定义好了。 在从 DataStream 到 Table 转换时定义事件时间属性有两种方式。取决于用 .rowtime 后缀修饰的字段名字是否是已有字段事件时间字段可以是 1、在 schema 的结尾追加一个新的字段 2、替换一个已经存在的字段。 不管在哪种情况下事件时间字段都表示 DataStream 中定义的事件的时间戳。 代码 援用上面WaterSensor类 package com.lyh.flink12;import com.lyh.bean.WaterSensor; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import java.time.Duration;import static org.apache.flink.table.api.Expressions.$;public class Flink_Sql_EventTime {public static void main(String[] args) {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor waterSensorSource env.fromElements(new WaterSensor(sensor_1, 1000L, 100),new WaterSensor(sensor_1, 1000L, 100),new WaterSensor(sensor_2, 1000L, 200),new WaterSensor(sensor_2, 1000L, 200)).assignTimestampsAndWatermarks(WatermarkStrategy.WaterSensorforBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner((element, recordtime) - element.getTs()));StreamTableEnvironment tableEnv StreamTableEnvironment.create(env);tableEnv.fromDataStream(waterSensorSource,$(id),$(ts),$(vc),$(pt).rowtime()).execute().print();} }运行结果 3.2、使用已有的字段作为时间属性 .fromDataStream(waterSensorStream, $(id), $(ts).rowtime(), $(vc));3.3、在创建表的 DDL 中定义 事件时间属性可以用 WATERMARK 语句在 CREATE TABLE DDL 中进行定义。WATERMARK 语句在一个已有字段上定义一个 watermark 生成表达式同时标记这个已有字段为时间属性字段. package com.lyh.flink12;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class Flink_Sql_ddl_EventTime {public static void main(String[] args) {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv StreamTableEnvironment.create(env);tableEnv.executeSql(create table sensor( id string, ts bigint, vc int, t as to_timestamp(from_unixtime(ts/1000,yyyy-MM-dd HH:mm:ss)), watermark for t as t - interval 5 second) with( connector filesystem, path input/sensor.txt, format csv ));tableEnv.sqlQuery(select * from sensor).execute().print();} }运行结果 说明: 1.把一个现有的列定义为一个为表标记事件时间的属性。该列的类型必须为 TIMESTAMP(3)且是 schema 中的顶层列它也可以是一个计算列。 2.严格递增时间戳 WATERMARK FOR rowtime_column AS rowtime_column。 3.递增时间戳 WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL ‘0.001’ SECOND。 乱序时间戳 WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL ‘string’ timeUnit。
http://www.lakalapos1.cn/news/17306/

相关文章:

  • 荣耀商城官网网站android开发技术
  • 长春网站建站有什么网站可以接活做设计
  • 上海高端品牌网站制作wordpress当下载站
  • 国外最牛设计网站wordpress 标题优化
  • 山东中讯做网站怎么样wordpress网站地图插件
  • 织梦开发网站台州模板建站代理
  • ui下载高清素材的网站有哪些如何建立免费网站
  • 义乌网站建设yw126免费seo刷排名
  • 石家庄做网站电话aspx网站做app
  • 顺的网站建设精英wordpress 注册
  • 美食网站的建设背景百度广告推广费用年费
  • 网站用wordpress还是wp用织梦做的手机网站怎么才能和电脑同步
  • 乐清门户网站建设做国际贸易都用什么网站
  • 公司网站里面页面链接怎么做ppt那个网站做的好
  • 河南有名的做网站公司有哪些做公众号需要网站
  • 网站建设公司找哪家网站制作与美育融合
  • 绍兴建设银行网站首页ghost 博客wordpress
  • 济南网站制作公司排名如何在电脑上打开自己做的网站
  • 龙岩网站推广软件wordpress程序的主题
  • 皂君庙网站建设给一个网站风格做定义
  • 手机网站开发调用照片专业做网站安全的sine安
  • 网站开发毕业设计说明原创小说网站建设源码
  • 让百度收录整个网站如何开发一个安卓app
  • wordpress做管理网站网站建设的感想和建议
  • 外包建设网站服务网站建设沛宣
  • 义乌网站建设公司代理wordpress讨论组
  • 文登住房和城乡建设局网站网站建设平台方案设计
  • 网站排版个人网站备案名称要求
  • h5网站开发设计成都优创智汇网站建设
  • 做二手车按揭的网站帝国cms 网站地图 xml