建站宝盒站群版,搭建网站的手机软件,网站制作北京网站建设公司哪家好,文件管理系统目录
一、Hive做离线批处理
1、实现步骤
①、启动hadoop#xff0c;启动hive
②、在hive下创建weblog库#xff0c;并使用
③、 创建外部表管理数据
④、为总表添加当天分区数据
⑤、建立数据清洗表#xff0c;用于清洗出业务所需的字段。
⑥、业务处理
⑦、创建业…目录
一、Hive做离线批处理
1、实现步骤
①、启动hadoop启动hive
②、在hive下创建weblog库并使用
③、 创建外部表管理数据
④、为总表添加当天分区数据
⑤、建立数据清洗表用于清洗出业务所需的字段。
⑥、业务处理
⑦、创建业务表并插入数据
⑧、从清洗表查询得到当天的统计指标插入到业务表中
⑨、利用Sqoop工具从HDFS上将数据导入到Mysql数据库中
二、Hive的占位符与文件调用
1、概述
2、Hive文件的调用
3、Hive占位符的使用
4、结合业务实现
5、Linux Crontab定时任务
三、实时业务系统搭建
1、Flume与Kafka的连通
四、实时流开发环境搭建
1、Spark与HBase整合基础
2、实时流业务处理 一、Hive做离线批处理
1、实现步骤 ①、启动hadoop启动hive 进入hive的bin目录以后台方式启动 nohup hive --service metastore nohup hive --service hiveserver2 sh hive ②、在hive下创建weblog库并使用 create database weblog; use weblog ③、 创建外部表管理数据
建立总表用于管理所有的字段数据。
总表特点管理所有字段外部表分区表 hdfs上的数据 建表语句 create external table flux (url string,urlname string,title string,chset string,scr string,col string,lg string,je string,ec string,fv string,cn string,ref string,uagent string,stat_uv string,stat_ss string,cip string) PARTITIONED BY (reporttime string) row format delimited fields terminated by | location /weblog; ④、为总表添加当天分区数据 1、msck repair table flux; 2、alter table flux add partition(reporttime2022-04-20) location /weblog/reporttime2022-04-20; ⑤、建立数据清洗表用于清洗出业务所需的字段。 dataclear 指定的分割符 : | 去除多余字段只保留需要的字段并将会话信息拆开保存 所需要的字段为 reporttime、url、urlname、uvid、ssid、sscount、sstime、cip create table dataclear(reportTime string,url string,urlname string,uvid string,ssid string,sscount string,sstime string,cip string)row format delimited fields terminated by |; 从总表中查询出当天的对应的字段插入到清洗表中 insert overwrite table dataclear
select reporttime,url,urlname,stat_uv,split(stat_ss,_)[0],split(stat_ss,_)[1],split(stat_ss,_)[2],cip from flux; ⑥、业务处理 1、pv select count(*) as pv from dataclear where reportTime 2022-04-20; 2、uv uv - 独立访客数 - 一天之内所有的访客的数量 - 一天之内uvid去重后的总数 select count(distinct uvid) as uv from dataclear where reportTime 2022-04-20; 3、vv vv - 独立会话数 - 一天之内所有的会话的数量 - 一天之内ssid去重后的总数 select count(distinct ssid) as vv from dataclear where reportTime 2022-04-20; 4、br br - 跳出率 - 一天内跳出的会话总数/会话总数 select round(br_taba.a/br_tabb.b,4)as br from (select count(*) as a from (select ssid from dataclear where reportTime2022-04-20 group by ssid having count(ssid) 1) as br_tab) as br_taba,(select count(distinct ssid) as b from dataclear where reportTime2022-04-20) as br_tabb; 这段sql就是对会话id分组然后求出会话id为1的个数这个就是跳出会话 5、newip newip - 新增ip总数 - 一天内所有ip去重后在历史数据中从未出现过的数量 select count(distinct dataclear.cip) from dataclear where dataclear.reportTime 2022-04-20 and cip not in (select dc2.cip from dataclear as dc2 where dc2.reportTime 2022-04-20); 6、newcust newcust - 新增客户数 - 一天内所有的uvid去重后在历史数据中从未出现过的总数 select count(distinct dataclear.uvid) from dataclear where dataclear.reportTime2021-05-10 and uvid not in (select dc2.uvid from dataclear as dc2 where dc2.reportTime 2021-05-10); 7、avgtime avgtime - 平均访问时常 - 一天内所有会话的访问时常的平均值 注 一个会话的时长 会话中所有访问的时间的最大值 - 会话中所有访问时间的最小值 select avg(atTab.usetime) as avgtime from(select max(sstime) - min(sstime) as usetime from dataclear where reportTime2022-04-20 group by ssid) as atTab; 8、avgdeep avgdeep - 平均访问深度 - 一天内所有会话访问深度的平均值 一个会话的访问深度一个会话访问的所有url去重后的个数 比如会话①url http://demo/a.jsp http://demo/b.jsp http://demo/a.jsp 则访问深度是2 select round(avg(adTab.deep),4) as avgdeep from (select count(distinct urlname) as deep from dataclear where reportTime2022-04-20 group by ssid) as adTab; ⑦、创建业务表并插入数据 create table tongji(reportTime string,pv int,uv int,vv int, br double,newip int, newcust int, avgtime double,avgdeep double) row format delimited fields terminated by |; ⑧、从清洗表查询得到当天的统计指标插入到业务表中 insert overwrite table tongji select 2022-04-20,tab1.pv,tab2.uv,tab3.vv,tab4.br,tab5.newip,tab6.newcust,tab7.avgtime,tab8.avgdeep from(select count(*) as pv from dataclear where reportTime 2022-04-20) as tab1,
(select count(distinct uvid) as uv from dataclear where reportTime 2022-04-20) as tab2,
(select count(distinct ssid) as vv from dataclear where reportTime 2022-04-20) as tab3,
(select round(br_taba.a/br_tabb.b,4)as br from (select count(*) as a from (select ssid from dataclear where reportTime2022-04-20 group by ssid having count(ssid) 1) as br_tab) as br_taba,
(select count(distinct ssid) as b from dataclear where reportTime2022-04-20) as br_tabb) as tab4,
(select count(distinct dataclear.cip) as newip from dataclear where dataclear.reportTime 2022-04-20 and cip not in (select dc2.cip from dataclear as dc2 where dc2.reportTime 2022-04-20)) as tab5,
(select count(distinct dataclear.uvid) as newcust from dataclear where dataclear.reportTime2022-04-20 and uvid not in (select dc2.uvid from dataclear as dc2 where dc2.reportTime 2022-04-20)) as tab6,
(select round(avg(atTab.usetime),4) as avgtime from (select max(sstime) - min(sstime) as usetime from dataclear where reportTime2022-04-20 group by ssid) as atTab) as tab7,
(select round(avg(deep),4) as avgdeep from (select count(distinct urlname) as deep from dataclear where reportTime2022-04-20 group by ssid) as adTab) as tab8; ⑨、利用Sqoop工具从HDFS上将数据导入到Mysql数据库中 进入mysql mysql -uroot -proot 创建和使用库 create database weblog; use weblog; 创建表 create table tongji(reporttime varchar(40),pv int,uv int,vv int,br double,newip int,newcust int,avgtime double,avgdeep double); 使用Sqoop将数据导入到mysql中 进入 cd /home/software/sqoop-1.4.7/bin/ 执行 sh sqoop export --connect jdbc:mysql://hadoop01:3306/weblog --username root --password root --export-dir /user/hive/warehouse/weblog.db/tongji --table tongji -m 1 --fields-terminated-by | 进入mysql 查询 二、Hive的占位符与文件调用
1、概述
对于上面的工作我们发现需要手动去写hql语句从而完成离线数据的ETL但每天都手动来做显然是不合适的所以可以利用hive的文件调用与占位符来解决这个问题。
2、Hive文件的调用
实现步骤 ①、编写一个文件后缀名为.hive 比如我们现在我们创建一个01.hive文件 目的是在 hive的weblog数据库下创建一个tb1表 use weblog;
create table tb1 (id int,name string) ②、进入hive安装目录的bin目录 执行 sh hive -f 01.hive 注-f 参数后跟的是01.hive文件的路径 ③、测试hive的表是否创建成功 3、Hive占位符的使用
我们现在想通过hive执行文件将 tb1这个表删除则我们可以这样做 ①、创建02.hive文件 use weblog;
drop table ${tb_name}; ②、在bin目录下执行 sh hive -f 02.hive -d tb_nametb1 4、结合业务实现
在hive最后插入数据时涉及到一个日志的分区是以每天为单位所以我们需要手动去写这个日期比如 2022-04-20。我们可以这样做 ①、将hql语句里的日期相关的取值用占位符来表示并写在weblog.hive文件里 use weblog;
insert overwrite table tongji select ${reportTime},tab1.pv,tab2.uv,tab3.vv,tab4.br,tab5.newip,tab6.newcust,tab7.avgtime,tab8.avgdeep from (select count(*) as pv from dataclear where reportTime ${reportTime}) as tab1,(select count(distinct uvid) as uv from dataclear where reportTime ${reportTime}) as tab2,(select count(distinct ssid) as vv from dataclear where reportTime ${reportTime}) as tab3,(select round(br_taba.a/br_tabb.b,4)as br from (select count(*) as a from (select ssid from dataclear where reportTime${reportTime} group by ssid having count(ssid) 1) as br_tab) as br_taba,(select count(distinct ssid) as b from dataclear where reportTime${reportTime}) as br_tabb) as tab4,(select count(distinct dataclear.cip) as newip from dataclear where dataclear.reportTime ${reportTime} and cip not in (select dc2.cip from dataclear as dc2 where dc2.reportTime ${reportTime})) as tab5,(select count(distinct dataclear.uvid) as newcust from dataclear where dataclear.reportTime${reportTime} and uvid not in (select dc2.uvid from dataclear as dc2 where dc2.reportTime ${reportTime})) as tab6,(select round(avg(atTab.usetime),4) as avgtime from (select max(sstime) - min(sstime) as usetime from dataclear where reportTime${reportTime} group by ssid) as atTab) as tab7,(select round(avg(deep),4) as avgdeep from (select count(distinct urlname) as deep from dataclear where reportTime${reportTime} group by ssid) as adTab) as tab8; ②、在hive 的bin目录下执行 sh hive -f weblog.hive -d reportTime2022-04-20 对于日期如果不想手写的话可以通过linux的指令来获取 date %G-%m-%d 所以我们可以这样来执行hive文件的调用 sh hive -f weblog.hive -d reportTimedate %G-%m-%d(注是键盘右上方的反引号 也可以写为 sh hive -f weblog.hive -d reportTime$(date %G-%m-%d) 5、Linux Crontab定时任务
在工作中需要数据库在每天零点自动备份所以需要建立一个定时任务。
crontab命令的功能是在一定的时间间隔调度一些命令的执行。
可以通过 crontab -e 进行定时任务的编辑
crontab文件格式 * * * * * command minute hour day month week command 分 时 天 月 星期 命令 示例 */1 * * * * rm -rf /home/software/1.txt 每隔一分钟删除指定目录的 1.txt文件 对于上面的项目我们可以这样写 45 23 * * * ./home/software/hive-3.1.2/bin/hive -f /home/software/hive-3.1.2/bin/weblog.hive -d timedate %G-%m -%d 三、实时业务系统搭建
1、Flume与Kafka的连通 1.启动zk集群 2.启动kafka集群在其bin目录下执行 指令sh kafka-server-start.sh ../config/server.properties 3.创建主题 查看主题sh kafka-topics.sh --list --zookeeper hadoop01:2181 创建主题sh kafka-topics.sh --create --zookeeper hadoop01:2181 --replication-factor 1 --partitions 1 --topic fluxdata 4.配置flume的data下的weblog.conf自己建的 a1.sourcesr1
a1.channelsc1 c2
a1.sinksk1 k2a1.sources.r1.typeavro
a1.sources.r1.bind0.0.0.0
a1.sources.r1.port44444
a1.sources.r1.interceptorsi1
a1.sources.r1.interceptors.i1.typetimestampa1.sinks.k1.typehdfs
a1.sinks.k1.hdfs.pathhdfs://192.168.186.128:9000/weblog/reportTime%Y-%m-%d
a1.sinks.k1.hdfs.fileTypeDataStream
a1.sinks.k1.hdfs.rollInterval0
a1.sinks.k1.hdfs.rollSize0
a1.sinks.k1.hdfs.rollCount1000a1.sinks.k2.typeorg.apache.flume.sink.kafka.KafkaSink
a1.sinks.k2.brokerListhadoop01:9092,hadoop02:9092,hadoop03:9092
a1.sinks.k2.topicfluxdataa1.channels.c1.typememory
a1.channels.c1.capacity1000
a1.channels.c1.transactionCapacity100a1.channels.c2.typememory
a1.channels.c2.capacity1000
a1.channels.c2.transactionCapacity100a1.sources.r1.channelsc1 c2
a1.sinks.k1.channelc1
a1.sinks.k2.channelc2 启动hadoop 在flume的data目录执行下面语句启动flume ../bin/flume-ng agent -n a1 -c ./ -f ./weblog.conf -Dflume.root.loggerINFO,console 5.启动tomcat访问埋点服务器 6.测试kafka是否能够收到数据 进入kafka的bin目录启动kafka消费者线程 sh kafka-console-consumer.sh --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 --topic fluxdata --from-beginning 这时候我们访问页面 四、实时流开发环境搭建 1、Spark与HBase整合基础
实现步骤 1、启动IDEA 2、创建Maven工程骨架选择quickstart 3、IDEA安装scala 4、为FluxStreamingServer工程添加scala sdk 这里如果spark如果是2版本我们scala用scala2.11.7,稳定如果是3版本我们可以用scala2.12.X 5、创建一个scala目录使其称为sources root 6、引入工程pom ?xml version1.0 encodingUTF-8?project xmlnshttp://maven.apache.org/POM/4.0.0 xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersiongroupIdorg.example/groupIdartifactIdFluxStreamingServer/artifactIdversion1.0-SNAPSHOT/versionnameFluxStreamingServer/name!-- FIXME change it to the projects website --urlhttp://www.example.com/urlpropertiesproject.build.sourceEncodingUTF-8/project.build.sourceEncodingmaven.compiler.source1.7/maven.compiler.sourcemaven.compiler.target1.7/maven.compiler.target/propertiesdependenciesdependencygroupIdjunit/groupIdartifactIdjunit/artifactIdversion4.11/versionscopetest/scope/dependency!--spark --dependencygroupIdorg.apache.spark/groupIdartifactIdspark-core_2.12/artifactIdversion3.1.2/version/dependencydependencygroupIdorg.apache.spark/groupIdartifactIdspark-streaming_2.12/artifactIdversion3.1.1/version/dependencydependencygroupIdorg.apache.spark/groupIdartifactIdspark-sql_2.12/artifactIdversion3.1.2/version/dependencydependencygroupIdorg.apache.spark/groupIdartifactIdspark-mllib_2.12/artifactIdversion3.1.2/version/dependency!-- kafka --dependencygroupIdorg.apache.kafka/groupIdartifactIdkafka_2.12/artifactIdversion2.8.0/version/dependencydependencygroupIdorg.apache.spark/groupIdartifactIdspark-streaming-kafka-0-10_2.12/artifactIdversion3.1.2/version/dependency!--HBase--dependencygroupIdorg.apache.hbase/groupIdartifactIdhbase/artifactIdversion2.4.2/versiontypepom/type/dependencydependencygroupIdorg.apache.hbase/groupIdartifactIdhbase-client/artifactIdversion2.4.2/version/dependencydependencygroupIdorg.apache.hbase/groupIdartifactIdhbase-common/artifactIdversion2.4.2/version/dependencydependencygroupIdorg.apache.hbase/groupIdartifactIdhbase-server/artifactIdversion2.4.2/version/dependencydependencygroupIdorg.apache.hbase/groupIdartifactIdhbase-protocol/artifactIdversion2.4.2/version/dependencydependencygroupIdorg.apache.hbase/groupIdartifactIdhbase-hadoop-compat/artifactIdversion2.4.2/version/dependencydependencygroupIdorg.apache.hbase/groupIdartifactIdhbase-mapreduce/artifactIdversion2.4.2/version/dependency!--mysql--dependencygroupIdmysql/groupIdartifactIdmysql-connector-java/artifactIdversion5.1.38/version/dependencydependencygroupIdcom.mchange/groupIdartifactIdc3p0/artifactIdversion0.9.5.5/version/dependency/dependenciesbuildpluginManagement!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) --plugins!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle --pluginartifactIdmaven-clean-plugin/artifactIdversion3.1.0/version/plugin!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging --pluginartifactIdmaven-resources-plugin/artifactIdversion3.0.2/version/pluginpluginartifactIdmaven-compiler-plugin/artifactIdversion3.8.0/version/pluginpluginartifactIdmaven-surefire-plugin/artifactIdversion2.22.1/version/pluginpluginartifactIdmaven-jar-plugin/artifactIdversion3.0.2/version/pluginpluginartifactIdmaven-install-plugin/artifactIdversion2.5.2/version/pluginpluginartifactIdmaven-deploy-plugin/artifactIdversion2.8.2/version/plugin!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle --pluginartifactIdmaven-site-plugin/artifactIdversion3.7.1/version/pluginpluginartifactIdmaven-project-info-reports-plugin/artifactIdversion3.0.0/version/pluginpluginartifactIdmaven-jar-plugin/artifactIdversion3.0.2/versionconfigurationarchivemanifestaddClasspathtrue/addClasspathuseUniqueVersionsfalse/useUniqueVersionsclasspathPrefixlib//classpathPrefixmainClasscn.tedu.streaming.StreamingDriver/mainClass/manifest/archive/configuration/plugin/plugins/pluginManagement/build
/project7、学习Spark与Hbase整合基础 新建一个object 代码如下 package cn.yang.basicimport org.apache.hadoop.fs.shell.find.Result
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.{SparkConf, SparkContext}/*** 如何通过Spark将数据写出到HBase表中*/
object HBaseWriteDriver {def main(args: Array[String]): Unit {val conf new SparkConf().setMaster(local).setAppName(writeHBase)val sc new SparkContext(conf)//设定zookeeper集群IP地址。注意主机名和服务器ip对应一致sc.hadoopConfiguration.set(hbase.zookeeper.quorum,hadoop01,hadoop02,hadoop03)//设定zookeeper通信端口sc.hadoopConfiguration.set(hbase.zookeeper.property.clientPort,2181)//指定输出的HBase表名sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE,tbx)//创建Hadoop Job对象val job new Job(sc.hadoopConfiguration)//设定输出的key类型job.setOutputKeyClass(classOf[ImmutableBytesWritable])//设定输出的value类型,导包:org.apache.hadoop.fs.shell.find.Resultjob.setOutputValueClass(classOf[Result])//设定输出表类型job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])val rddsc.parallelize(List(1 tom 23,2 rose 18,3 jim 25,4 jary 30))//为了能够将数据插入到HBase表,需要做类型转换 RDD[String]-RDD[(输出key,输出value)]val hbaseRDDrdd.map{lineval arr line.split( )val idarr(0)val namearr(1)val agearr(2)//创建HBase的行对象并指定行键。导包:org.apache.hadoop.hbase.client.Putval row new Put(id.getBytes())//①参:列族名 ②参:列名 ③参:列值row.addColumn(cf1.getBytes(),name.getBytes(),name.getBytes())row.addColumn(cf1.getBytes(),age.getBytes(),age.getBytes())(new ImmutableBytesWritable(),row)}//执行插入hbaseRDD.saveAsNewAPIHadoopDataset(job.getConfiguration)}
}8、启动服务器三台zookeeperhadoophbase cd /home/software/hbase-2.4.2/bin/ sh start-hbase.sh 9、进入01的hbase的shell建表 sh hbase shell 10、执行代码附上读取与扫描hbase代码 package cn.yang.basicimport org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.spark.{SparkConf, SparkContext}object HBaseReadDriver {def main(args: Array[String]): Unit {val confnew SparkConf().setMaster(local).setAppName(read)val scnew SparkContext(conf)//创建HBase环境参数对象val hbaseConfHBaseConfiguration.create()hbaseConf.set(hbase.zookeeper.quorum,hadoop01,hadoop02,hadoop03)hbaseConf.set(hbase.zookeeper.property.clientPort,2181)//指定读取的表名hbaseConf.set(TableInputFormat.INPUT_TABLE,tbx)//执行读取。并将HBase表数据读取到RDD结果集中val resultRDDsc.newAPIHadoopRDD(hbaseConf,classOf[TableInputFormat],classOf[ImmutableBytesWritable],//导包:import org.apache.hadoop.hbase.client.ResultclassOf[Result])resultRDD.foreach{case(k,v)val namev.getValue(cf1.getBytes(),name.getBytes())val agev.getValue(cf1.getBytes(),age.getBytes())println(new String(name):new String(age))}}
} package cn.yang.basicimport org.apache.commons.codec.binary.Base64
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.{Result, Scan}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
import org.apache.spark.{SparkConf, SparkContext}/*** 学习如何扫描hbase表数据*/
object HBaseScanDriver {def main(args: Array[String]): Unit {val confnew SparkConf().setMaster(local).setAppName(read)val scnew SparkContext(conf)//创建HBase环境参数对象val hbaseConfHBaseConfiguration.create()hbaseConf.set(hbase.zookeeper.quorum,hadoop01,hadoop02,hadoop03)hbaseConf.set(hbase.zookeeper.property.clientPort,2181)//指定读取的表名hbaseConf.set(TableInputFormat.INPUT_TABLE,tbx)//创建HBase scan扫描对象val scannew Scan()//设定扫描的起始行键scan.setStartRow(2.getBytes())//设定扫描终止行键。含头不含尾scan.setStopRow(4.getBytes())//设定scan对象使其生效hbaseConf.set(TableInputFormat.SCAN,Base64.encodeBase64String(ProtobufUtil.toScan(scan).toByteArray()))//执行读取。并将HBase表数据读取到RDD结果集中val resultRDDsc.newAPIHadoopRDD(hbaseConf,classOf[TableInputFormat],classOf[ImmutableBytesWritable],//导包:import org.apache.hadoop.hbase.client.ResultclassOf[Result])resultRDD.foreach{case(k,v)val namev.getValue(cf1.getBytes(),name.getBytes())val agev.getValue(cf1.getBytes(),age.getBytes())println(new String(name):new String(age))}}
} 2、实时流业务处理
实现步骤 ①、启动三台服务器启动zookeeper启动hadoop启动kafka启动flume cd /home/software/kafka_2.10-0.10.0.1/bin/ sh kafka-server-start.sh ../config/server.properties 在flume的data目录执行下面语句启动flume ../bin/flume-ng agent -n a1 -c ./ -f ./weblog.conf -Dflume.root.loggerINFO,console ②、整合SparkStreaming与kafka完成代码编写 在FluxStreamingServer下的scala文件下新建一个包streaming新建一个Driver 添加代码下面为全部内容的代码 结构 Driver package cn.yang.streamingimport cn.yang.TongjiBean
import cn.yang.dao.{HBaseUtil, MysqlUtil}
import cn.yang.pojo.LogBean
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}import java.util.Calendarobject Driver {def main(args: Array[String]): Unit {//如果后续要使用SparkStreaming从kafka消费数据启动的线程数至少是2个//其中一个线程负责SparkStreaming另外一个线程负责从kafka消费数据//还需要设定一下序列化参数val confnew SparkConf().setMaster(local[2]).setAppName(kafkasource).set(spark.serializer,org.apache.spark.serializer.KryoSerializer)val sc new SparkContext(conf)sc.setLogLevel(error)//创建SparkStreaming对象并指定批大小val ssc new StreamingContext(sc,Seconds(5))//指定从kafka消费的主题通过Array可以指定消费多个主题val topics Array(fluxdata)//指定kafka的配置参数。通过Map来进行设定key是属性名value是属性值//需要指定:kafka服务集群列表key value的序列化类型固定为String类型消费者组名val kafkaParams: Map[String, Object] Map[String, Object](bootstrap.servers - hadoop01:9092,hadoop02:9092,hadoop03:9092,key.deserializer - classOf[StringDeserializer],value.deserializer - classOf[StringDeserializer],group.id - bos)//1参SparkStreaming对象 2参从Kafka消费模式消费指定主题的所有分区数据//3参kafka订阅参阅信息val streamKafkaUtils.createDirectStream[String, String](ssc,PreferConsistent,Subscribe[String, String](topics, kafkaParams)).map(xx.value())//打印输出方式一//foreachRDD,将当前批次内的所有数据转变为一个RDDstream.foreachRDD{rdd//将RDD[String]-Iterator[String]迭代器val linesrdd.toLocalIterator//遍历迭代器while(lines.hasNext){//获取一条数据val linelines.next()//第一步做数据字段清洗。所需字段url urlname uvid ssid sscount sstime cipval arr line.split(\\|)val urlarr(0)val urlnamearr(1)val uvidarr(13)val ssidarr(14).split(_)(0)val sscountarr(14).split(_)(1)val sstimearr(14).split(_)(2)val ciparr(15)//第二步将清洗好的字段封装到bean中val logBeanLogBean(url,urlname,uvid,ssid,sscount,sstime,cip)//第三步统计实时业务指标。有pv uv vv newip newcust//这5个指标的统计结果定为两种情况1或0//3-1 pv:页面访问量。用户访问1次就记作1个pvval pv 1//3-2 uv:独立用户数。uv1或uv0处理逻辑//①拿着当前记录的uvid去HBase表webtable查询当天的所有数据//②、如果没查到此uvid的记录则记uv1//③、如果查到了此uvid的记录则记uv0//实现难点//如何查询Hbase表‘当天’的数据//查询的起始时间戳startTime当天的凌晨零点时间戳//查询的终止时间戳endTimesstimeval endTimesstime.toLongval calendarCalendar.getInstance()calendar.setTimeInMillis(endTime)calendar.set(Calendar.HOUR,0)calendar.set(Calendar.MINUTE,0)calendar.set(Calendar.SECOND,0)calendar.set(Calendar.MILLISECOND,0)//获取当天凌晨零点的时间戳val startTimecalendar.getTimeInMillis//如何判断当前记录中的uvid在HBase表是否出现过//可以使用HBase的行键过滤器来实现(使用HBase的行键正则过滤器)val uvRegex^\\d_uvid.*$val uvRDDHBaseUtil.queryHBase(sc,startTime,endTime,uvRegex)val uvif(uvRDD.count()0) 1 else 0//3-3 vv:独立会话数。vv1 或 vv0 判断逻辑同uv//只不过判断指标变为当前记录的ssidval vvRegex^\\d_\\d_ssid.*$val vvResultHBaseUtil.queryHBase(sc,startTime,endTime,vvRegex)val vvif(vvResult.count()0) 1 else 0//3-4newip新增ip。newip1 或newip0 判断逻辑//用当前记录中的ip去HBase表查询历史数据包含当天//如果没查到则newip1.反之newip0val ipRegex^\\d_\\d_\\d_cip.*$val ipResultHBaseUtil.queryHBase(sc,startTime0,endTime,ipRegex)val newipif(ipResult.count()0) 1 else 0//3-5 newcust:新增用户数。处理逻辑和newip相同//判断指标更换为uvid。正则使用uvRegexval custResultHBaseUtil.queryHBase(sc,startTime0,endTime,uvRegex)val newcustif(custResult.count()0)1 else 0//第四步将统计好的业务指标封装到bean中然后插入到mysql数据库中val tongjiBeanTongjiBean(sstime,pv,uv,vv,newip,newcust)MysqlUtil.saveToMysql(tongjiBean)//将封装好的bean数据存到HBase表中供后续做查询使用HBaseUtil.saveToHBase(sc,logBean)println(logBean)}}//打印输出方式二有线-----//stream.print()ssc.start()//保证SparkStreaming一直开启直到用户主动中断退出为止ssc.awaitTermination()}
}dao-HBaseUtil package cn.yang.daoimport cn.yang.pojo.LogBean
import org.apache.commons.codec.binary.Base64
import org.apache.hadoop.fs.shell.find.Result
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.{Put, Scan}
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp
import org.apache.hadoop.hbase.filter.{RegexStringComparator, RowFilter}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.{TableInputFormat, TableOutputFormat}
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.SparkContextimport scala.util.Randomobject HBaseUtil {def queryHBase(sc: SparkContext, startTime: Long, endTime: Long, regex: String) {val hbaseConfHBaseConfiguration.create()hbaseConf.set(hbase.zookeeper.quorum, hadoop01,hadoop02,hadoop03)hbaseConf.set(hbase.zookeeper.property.clientPort,2181)//指定读取的表名hbaseConf.set(TableInputFormat.INPUT_TABLE,webtable)val scannew Scan()scan.withStartRow(startTime.toString().getBytes)scan.withStopRow(endTime.toString().getBytes)//org.apache.hadoop.hbase.filter.RowFilterval filternew RowFilter(CompareOp.EQUAL,new RegexStringComparator(regex))//绑定过滤器使其生效,即在做范围查询时结合行键正则过滤器来返回对应的结果scan.setFilter(filter)//设置scan对象,使其生效hbaseConf.set(TableInputFormat.SCAN,Base64.encodeBase64String(ProtobufUtil.toScan(scan).toByteArray()))//执行读取,将结果返回到结果集RDD中val resultRDDsc.newAPIHadoopRDD(hbaseConf,classOf[TableInputFormat],classOf[ImmutableBytesWritable],classOf[org.apache.hadoop.hbase.client.Result])//QueryByRangeAndRegex方法:返回结果集RDDresultRDD}/*将封装好的logbean数据存到指定的HBase中*/def saveToHBase(sc: SparkContext, logBean: LogBean) {sc.hadoopConfiguration.set(hbase.zookeeper.quorum,hadoop01,hadoop02,hadoop03)sc.hadoopConfiguration.set(hbase.zookeeper.property.clientPort,2181)sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE,webtable)val job new Job(sc.hadoopConfiguration)job.setOutputKeyClass(classOf[ImmutableBytesWritable])job.setOutputValueClass(classOf[Result])job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])val rddsc.parallelize(List(logBean))val hbaseRDDrdd.map{bean//本项目的行键设计为sstime_uvid_ssid_cip_随机数字//行键以时间戳开头作用让HBase按时间戳做升序排序便于后续按时间段范围查询//行键中还包含uvidssidcip信息。便于统计处相关业务指标比如uvvv等//随机数字满足散列原则val rowKeybean.sstime_bean.uvid_bean.ssid_bean.cip_Random.nextInt(100)//创建一个HBase行对象并指定行键val rownew Put(rowKey.getBytes)row.addColumn(cf1.getBytes,url.getBytes,bean.url.getBytes)row.addColumn(cf1.getBytes,urlname.getBytes,bean.urlname.getBytes)row.addColumn(cf1.getBytes,uvid.getBytes,bean.uvid.getBytes)row.addColumn(cf1.getBytes,ssid.getBytes,bean.ssid.getBytes)row.addColumn(cf1.getBytes,sscount.getBytes,bean.sscount.getBytes)row.addColumn(cf1.getBytes,sstime.getBytes,bean.sstime.getBytes)row.addColumn(cf1.getBytes,cip.getBytes,bean.cip.getBytes)(new ImmutableBytesWritable,row)}//执行写出hbaseRDD.saveAsNewAPIHadoopDataset(job.getConfiguration)}}dao-MysqlUtil package cn.yang.daoimport cn.yang.TongjiBean
import com.mchange.v2.c3p0.ComboPooledDataSourceimport java.sql.{Connection, PreparedStatement, ResultSet}
import java.text.SimpleDateFormatobject MysqlUtil {val c3p0new ComboPooledDataSource()def saveToMysql(tongjiBean: TongjiBean) {var conn:Connectionnullvar ps1:PreparedStatementnullvar rs:ResultSetnullvar ps2:PreparedStatementnullvar ps3:PreparedStatementnulltry{connc3p0.getConnection/*处理逻辑1、查询mysql的tongji2表当天的数据2、如果当天还没有数据则做新增插入如果当天已有数据则作更新累加*///解析出当天的日期。格式如:2022-04-24val sdfnew SimpleDateFormat(YYYY-MM-dd)val todayTimesdf.format(tongjiBean.sstime.toLong)//先查询tongji2表。如果当天已经有数据了,则更新累加//如果当天还没有数据,则新增插入ps1conn.prepareStatement(select * from tongji2 where reporttime?)ps1.setString(1,todayTime)//执行查询并返回结果集rsps1.executeQuery()if(rs.next()){//当天已经有数据则做更新累积ps2conn.prepareStatement(update tongji2 set pvpv?,uvuv?,vvvv?,newipnewip?,newcustnewcust? where reporttime?)ps2.setInt(1, tongjiBean.pv)ps2.setInt(2, tongjiBean.uv)ps2.setInt(3, tongjiBean.vv)ps2.setInt(4, tongjiBean.newip)ps2.setInt(5, tongjiBean.newcust)ps2.setString(6, todayTime)ps2.executeUpdate()}else{//则表示当天还没有数据,则新增插入ps3conn.prepareStatement(insert into tongji2 values(?,?,?,?,?,?))ps3.setString(1, todayTime)ps3.setInt(2, tongjiBean.pv)ps3.setInt(3, tongjiBean.uv)ps3.setInt(4, tongjiBean.vv)ps3.setInt(5, tongjiBean.newip)ps3.setInt(6, tongjiBean.newcust)ps3.executeUpdate()}}catch {case t:Exception{t.printStackTrace()}}finally {if(ps3!null)ps3.closeif(ps2!null)ps2.closeif(rs!null)rs.closeif(ps1!null)ps1.closeif(conn!null)conn.close}}}pojo-LogBean package cn.yang.pojocase class LogBean(url:String,urlname:String,uvid:String,ssid:String,sscount:String,sstime:String,cip:String)TongjiBean package cn.yangcase class TongjiBean(sstime:String,pv:Int,uv:Int,vv:Int,newip:Int,newcust:Int)③、启动SparkStreaming ④、启动tomcat访问埋点服务器测试SparkStreaming是否能够收到数据 ⑤、启动HBase cd /home/software/hbase-2.4.2/bin/ sh start-hbase.sh sh hbase shell 建表create webtable,cf1 ⑥、我们启动tomcat启动Driver测试然后扫描表webtable 发现有数据了 ⑦、进入mysql在weblog库下新建表 create table tongji2(reporttime date,pv int,uv int,vv int,newip int,newcust int); ⑧、执行我们的程序访问埋点最后到mysql查看数据这也是我们本项目实现的最终结果结果内容存储到mysql数据库中