一、概述
(一)业务背景
网站流量统计是改进网站服务的重要手段之一,通过获取用户在网站的行为数据,进行分析,得到有价值的信息。可以基于这些数据对网站进行改进。
按照在线情况分析:
来访时间、访客地域、来路页面、当前停留页面等,这些功能对企业实时掌握自身网站流量有很大的帮助。
按照时段分析:
按照任意的时间段 和任意的时间粒度来进行分析,比如小时段分布,日访问量分布,对于企业了解用户浏览网页的的时间段有一个很好的分析。
按来源分析
来源分析提供来路域名带来的来访次数、IP、独立访客、新访客、新访客浏览次数、站内总浏览次数等数据。这个数据可以直接让企业了解推广成效的来路,从而分析出那些网站投放的广告效果更明显。
(二)业务需求
a. PV
PV(page view),访问量 也叫 点击量,即一天之内整个网站中的页面被访问的次数。对同一个页面的重复访问记为不同的PV。
b. UV
UV(unique visitor),独立访客数,即一天之内访问网站的人数。同一个人在一天之内访问网站多次,也只能计算一个UV。
c. VV
VV (Visit View),会话总数,即一天之内会话的总的数量。所谓的一次会话,指的是为了实现某些功能,浏览器开发网站网站,从访问第一个页面开始,会话开始,直到访问最后一个页面结束,关闭所有页面,会话结束。会话可以认为在访问第一个页面时开始,访问所有页面完成并关闭,或,超过指定时长没有后续访问 都认为会话结束。
d. BR
BR(Bounce Rate)跳出率,一天之内跳出的会话总数占所有会话总数的比率。所谓跳出只的是一个会话中只访问过一个页面会话就结束了,这就称之为该会话跳出了。跳出的会话占全部会话的比率,称之为跳出率。这个指标在评价行推广活动的效果时非常的有用。
e. NewIP
NewIp,新增ip总数,一天之内访问网站的所有IP去重后,检查有多少是在历史数据中从未出现过的,这些IP计数,就是新增的IP总数,这个指标可以一定程度上体现网站新用户增长的情况。
f. NewCust
NewCust,新增独立访客数,一天之内访问网站的人中,有多少人是在历史记录中从来没有出现过的。这个指标可以从另一个角度体现网站用户增长的情况。
g. AvgTime
AvgTime,平均访问时长,所谓一个会话的访问时长,是指,一个会话结束的时间减取一个会话开始时间得到的会话经历的时长。将一天之内所有会话的访问时长求平均值 就是 平均访问时长。这个指标可以体现出网站对用户的粘性大小。
h. AvgDeep
AvgDeep,平均访问深度,所谓一个会话的访问深度,是指,一个会话中访问的所有资源地址去重后计数得到的指标。将一天之内所有会话的访问深度求平均值 就是 平均访问深度。这个指标可以体现出网站对用户的粘性大小。
(三)技术架构
二、实时流处理
(一)SparkStreaming 与 kafka整合实现步骤
1、启动虚拟机(2台或者3台)
2、启动zookeeper集群 sh /home/software/zookeeper-3.4.7/bin/zkServer.sh start
3、启动伪分布式hadoop
在三台节点启动JournalNode:hadoop-daemon.sh start journalnode
在第一、二台节点上启动NameNode:hadoop-daemon.sh start namenode
在三台节点上启动DataNode:hadoop-daemon.sh start datanode
在第一台节点和第二节点上启动zkfc(FailoverController):hadoop-daemon.sh start zkfc
在第三个节点上启动Yarn:start-yarn.sh
在第一个节点上启动ResourceManager:yarn-daemon.sh start resourcemanager
4、启动kafka集群
sh /home/software/kafka/bin/kafka-server-start.sh /home/software/kafka/config/server.properties
5、创建kafka指定的主题
[root@hadoop01 ~]# cd /home/software/kafka/bin/
[root@hadoop01 bin]# sh kafka-topics.sh --create --zookeeper hadoop01:2181 --replication-factor 1 --partitions 1 --topic logdata
6、编写并启动flume
flume weblog.conf编写
[root@hadoop01 conf]# vim weblog.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 44444
a1.sources.r1.interceptors=i1
a1.sources.r1.interceptors.i1.type=timestamp
# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://hadoop01:9000/weblog/reportTime=%Y-%m-%d
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.rollInterval = 30
a1.sinks.k1.hdfs.rollSize = 0
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k2.brokerList=hadoop01:9092,hadoop02:9092,hadoop03:9092
a1.sinks.k2.topic=logdata
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
[root@hadoop01 conf]# cd /home/software/flume/bin/
[root@hadoop01 bin]# sh flume-ng agent --conf ../conf --conf-file ../conf/weblog.conf --name a1 -Dflume.root.logger=INFO,console
7、启动tomcat(埋点服务器和日志服务器)
8、编写SparkStreaming与Kafka的整合代码,从kafka消费数据
①启动scala-eclipse ②创建scalaProject ③导入spark依赖jar包 添加如下两个目录下的内容为依赖包 ④导入kafka依赖jar包 ⑤导入spark与kafka整合包 ⑥创建scala Object类Driver
package cn.edu.streaming
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.kafka.KafkaUtils
object Driver {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[3]").setAppName("kafkasource")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc,Seconds(5))
val zkHosts = "hadoop01:2181,hadoop02:2181,hadoop03:2181"
val group="gp2021"
val topics = Map("logdata"→1)
val kafkaSource=KafkaUtils.createStream(ssc, zkHosts, group, topics)
kafkaSource.print()
ssc.start()
ssc.awaitTermination()
}
}
9、启动SparkStreaming,访问埋点服务器,测试sparkStreaming是否能够从kafka接收到数据
访问localhost:8080/FluxAppServer/a.jsp出现
(二)Spark与Hbase整合 实现步骤:
(Ⅰ)写数据
1、导入Hbase依赖包
2、编写spark与Hbase整合代码
package cn.edu.hbase
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.fs.shell.find.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.client.Put
object WriteDriver {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("writer")
val sc = new SparkContext(conf)
sc.hadoopConfiguration.set("hbase.zookeeper.quorum","hadoop01,hadoop02,hadoop03")
sc.hadoopConfiguration.set("hbase.zookeeper.property.clientPort","2181")
sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE, "tbh")
val job = new Job(sc.hadoopConfiguration)
job.setOutputKeyClass(classOf[ImmutableBytesWritable])
job.setOutputValueClass(classOf[Result])
job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
val rdd = sc.parallelize(List("1 tom 23","2 rose 18","3 jim 25","4 jsry 20"))
val hbaseRDD = rdd.map { line =>
val arr = line.split(" ")
val id = arr(0)
val name = arr(1)
val age = arr(2)
val row = new Put(id.getBytes)
row.add("cf1".getBytes,"name".getBytes,name.getBytes)
row.add("cf1".getBytes,"age".getBytes,age.getBytes)
(new ImmutableBytesWritable,row)
}
hbaseRDD.saveAsNewAPIHadoopDataset(job.getConfiguration)
}
}
3、启动多台虚拟机
4、启动zookeeper集群
5、启动伪分布式Hadoop
6、启动hbase集群
[root@hadoop01 bin]# cd /home/software/hbase/bin/
[root@hadoop01 bin]# sh start-hbase.sh
7、进入Hbase客户端,建表
[root@hadoop01 bin]# sh hbase shell
hbase(main):001:0> create 'tbh','cf1'
hbase(main):002:0> scan 'tbh'
ROW COLUMN+CELL
0 row(s) in 0.0870 seconds
8、启动spark,执行测试
hbase(main):003:0> scan 'tbh'
ROW COLUMN+CELL
1 column=cf1:age, timestamp=1630481015736, value=23
1 column=cf1:name, timestamp=1630481015736, value=tom
2 column=cf1:age, timestamp=1630481015736, value=18
2 column=cf1:name, timestamp=1630481015736, value=rose
3 column=cf1:age, timestamp=1630481015736, value=25
3 column=cf1:name, timestamp=1630481015736, value=jim
4 column=cf1:age, timestamp=1630481015736, value=20
4 column=cf1:name, timestamp=1630481015736, value=jsry
4 row(s) in 0.0880 seconds
hbase(main):004:0>
(Ⅱ)读数据:
1、ReadDriver
package cn.edu.hbase
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.client.Result
object ReadDriver {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("read")
val sc = new SparkContext(conf)
val hbaseConf = HBaseConfiguration.create()
hbaseConf.set("hbase.zookeeper.quorum","hadoop01,hadoop02,hadoop03")
hbaseConf.set("hbase.zookeeper.property.clientPort","2181")
hbaseConf.set(TableInputFormat.INPUT_TABLE,"tbh")
val resultRDD = sc.newAPIHadoopRDD(
hbaseConf,
classOf[TableInputFormat],
classOf[ImmutableBytesWritable],
classOf[Result])
resultRDD.foreach{case(key,value) =>
val name = value.getValue("cf1".getBytes(),"name".getBytes())
val age = value.getValue("cf1".getBytes(),"age".getBytes())
println(new String(name) + ":" +new String(age))
}
}
}
2、ScanDriver
package cn.edu.hbase
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.{Result, Scan}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
import org.apache.hadoop.hbase.util.Base64
import org.apache.spark.{SparkConf, SparkContext}
object ScanDriver {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("read")
val sc = new SparkContext(conf)
val hbaseConf = HBaseConfiguration.create()
hbaseConf.set("hbase.zookeeper.quorum","hadoop01,hadoop02,hadoop03")
hbaseConf.set("hbase.zookeeper.property.clientPort","2181")
hbaseConf.set(TableInputFormat.INPUT_TABLE,"tbh")
val scan = new Scan()
scan.setStartRow("2".getBytes())
scan.setStopRow("4".getBytes())
hbaseConf.set(TableInputFormat.SCAN,
Base64.encodeBytes(ProtobufUtil.toScan(scan).toByteArray()))
val data = sc.newAPIHadoopRDD(hbaseConf,classOf[TableInputFormat],classOf[ImmutableBytesWritable],classOf[Result]);
data.foreach{case(key,value) =>
val name = value.getValue("cf1".getBytes(),"name".getBytes())
val age = value.getValue("cf1".getBytes(),"age".getBytes())
println(new String(name) + ":" +new String(age))
}
}
}
(三)对(一)进一步处理
1、新建logBean
package cn.edu.pojo
case class logBean (url:String,
urlname:String,
uvid:String,
ssid:String,
sscount:String,
sstime:String,
cip:String
) {
}
2、新建HbaseUtil
package cn.edu.streaming
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.kafka.KafkaUtils
import cn.edu.pojo.LogBean
import cn.edu.dao.HBaseUtil
object Driver {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[3]").setAppName("kafkasource")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc,Seconds(5))
val zkHosts = "hadoop01:2181,hadoop02:2181,hadoop03:2181"
val group="gp2021"
val topics = Map("logdata"→1)
val kafkaSource=KafkaUtils.createStream(ssc, zkHosts, group, topics).map{x?x._2}
kafkaSource.foreachRDD{rdd ?
val lines = rdd.toLocalIterator
while (lines.hasNext){
val line = lines.next()
val arr = line.split("\\|")
val url = arr(0)
val urlname = arr(1)
val uvid = arr(13)
val ssid = arr(14).split("_")(0)
val sscount = arr(14).split("_")(1)
val sstime = arr(14).split("_")(2)
val cip = arr(15)
val logbean = LogBean(url,urlname,uvid,ssid,sscount,sstime,cip)
HBaseUtil.saveToHbase(sc, logbean)
}
}
kafkaSource.print()
ssc.start()
ssc.awaitTermination()
}
}
3、修改Driver
package cn.edu.streaming
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.kafka.KafkaUtils
import cn.edu.pojo.logBean
object Driver {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[3]").setAppName("kafkasource")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc,Seconds(5))
val zkHosts = "hadoop01:2181,hadoop02:2181,hadoop03:2181"
val group="gp2021"
val topics = Map("logdata"→1)
val kafkaSource=KafkaUtils.createStream(ssc, zkHosts, group, topics).map{x?x._2}
kafkaSource.foreachRDD{rdd ?
val lines = rdd.toLocalIterator
while (lines.hasNext){
val line = lines.next()
val arr = line.split("\\|")
val url = arr(0)
val urlname = arr(1)
val uvid = arr(13)
val ssid = arr(14).split("_")(0)
val sscount = arr(14).split("_")(1)
val sstime = arr(14).split("_")(2)
val cip = arr(15)
val logbean = logBean(url,urlname,uvid,ssid,sscount,sstime,cip)
println(logbean)
}
}
kafkaSource.print()
ssc.start()
ssc.awaitTermination()
}
}
3、
遇到问题及解决方法
(一)运行flume时出错
2021-09-01 11:01:08,307 (conf-file-poller-0) [ERROR - org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:427)] Sink k2 has been removed due to an error during configuration org.apache.flume.conf.ConfigurationException: brokerList must contain at least one Kafka broker at org.apache.flume.sink.kafka.KafkaSinkUtil.addDocumentedKafkaProps(KafkaSinkUtil.java:55) 原因:配置文件写错,a1.sinks.k1.brokerList写成了a1.sinks.brokeList
(二)ReadDriver出错
[2021-09-01 16:07:56,949] WARN Session 0x0 for server null, unexpected error, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn:1102) java.net.ConnectException: Connection refused: no further information at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) 原因 设定zookeeper集群地址与通信端口出错,不能直接复制WriteDrive 应为
hbaseConf.set("hbase.zookeeper.quorum","hadoop01,hadoop02,hadoop03")
hbaseConf.set("hbase.zookeeper.property.clientPort","2181")
而非
sc.hadoopConfiguration.set("hbase.zookeeper.quorum","hadoop01,hadoop02,hadoop03")
sc.hadoopConfiguration.set("hbase.zookeeper.property.clientPort","2181")
|