数据质量的好与坏直接关系到我们最终的数据分析结果的正确与否。
如果想要保证数据的高质量,我们需要对数据进行清洗,清洗有两个作用:
1、将数据质量不好的数据清洗掉,过滤掉不合法的数据
2、将原始数据中的某些信息转换成我们容易操作的字段或者模型信息,将数据中的某些数据的格式进行转换,以便我们后期处理。
要想了解点击流数据,我们需要知道web中的Session会话什么时候会过期?
1、浏览器关闭
2、session设置的时间到了(用户进入网站后长时间未操作,等设置的超时时间到了,用户需要重新点击,再开启一次新的会话)
3、服务器终止会话
4、服务器宕机(所有会话全部过期)
5、登录网站之后退出网站,会话也会过期
数据预处理/数据清洗(本案例是分析用户使用网站产生的点击流)
日志信息:
120.191.181.178? -? -? 2018-02-18? 20:24:39? "POST? https://www.taobao.com/item/bHTTP/1.1"? 203? 69172? https://www.taobao.com/register? UCBrowser? Webkit X3android? 8.0? 海南? 20.02? 110.20? 36
第一步:将不合法的数据舍弃掉(MR程序)
?做清洗操作,只需要一个map阶段即可,不需要reduce(不涉及聚合操作)
MR程序的工作流程: 1、根据文件大小,按照切片机制将文件切成对应的切片信息,生成切片文件,并且和MR程序的配置文件一起提交到一个路径 2、程序提交给YARN运行,开启ApplicationMaster。ApplicationMaster下载第一步提交的资源信息 3、Application根据资源信息、切片信息,生成对应的MapTask任务,YARN帮助启动Maptask任务 4、MapTask任务需要根据给它划分的切片信息去对应的文件中读取切片数据成key-value键值对
mapper阶段需要传入四个泛型参数 四个泛型参数分别是: ????????keyIn:maptask任务读取切片数据信息成keyvalue键值对的时候key的类型 ????????valueIn:maptask任务读取切片数据信息成keyvalue键值对的时候value的类型 ????????keyOut:MapTask处理完对应数据之后 输出的时候key的类型 ????????valueOut:输出的时候 value的类型
在MR程序默认情况下? mapper阶段输入的key-value是什么类型的?分别代表什么含义?
不管切片机制还是mapper阶段输入的key-value类型都是由InputFormat决定的。
默认情况下,我们使用的InputFormat是TextInputFormat这个输入格式类 ????????1、读取key、value的类型分别是LongWritable、Text类型,分别代表LongWritable代表的是每一行的首字符在文件中属于第几个字符、偏移量, value代表每一行的内容。 ????????2、TextInputFormat的切片机制: ???????? ① 按照文件切片,一个文件一个文件切 ???????? ② 具体切割。首先判断文件能不能被切片,比如像一些压缩格式文件无法切片,如果不能切片,那么当前文件就是一个切片, 如果可以切割 那么按照如下规则切片: ?????? ? ③ 获取文件的大小、获取 minSize、maxSize 、BlockSize ???????? 计算出切片的大小 :splitSize = Math.max(minSize,Math.Min(blockSize,maxSize)); ???????? 判断文件的大小有没有超过splitSize的1.1倍,如果超过,那么切片;如果没超过 ,那么不切片。
/**
* 日志清洗:
* MR程序清洗日志信息
* mr程序读取采集到的日志数据 按行读取 并且按照空格进行切割,得到字段
* 1、如果字段的长度个数16 那么数据直接舍弃
* 2、如果ip地址和时间不存在 直接舍弃掉
* 3、如果对应的状态大于400 直接数据舍弃掉
* 4、对应的数据没有省份 维度 经度 年龄 直接舍弃掉
*
* 清洗操作 只需要一个map阶段即可 不需要reduce
*/
public class WebLogCleaner {
public static void main(String[] args) throws IOException, URISyntaxException, InterruptedException, ClassNotFoundException {
//Driver
Configuration conf = new Configuration();
conf.set("fs.defaultFS","hdfs://192.168.10.4:9000");
/**
* 获取job对象 封装作业
*/
Job job = Job.getInstance(conf);
//设置jar的位置
job.setJarByClass(WebLogCleaner.class);
//封装Mapper类
job.setMapperClass(WebLogCleanMapper.class);
//因为这个MR程序只有Mapper没有reducer所以我们需要设置reduceTask为0 同时直接设置最终输出key value类型即可
job.setNumReduceTasks(0);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(WebLogBean.class);
//封装输入和输出文件
/**
* 输入的路径不必指定到我们的具体的路径,只需要将数据指定到我们的最开头的父目录即可
* d第一个* 代表是所有%y-%d-%m的文件夹路径
* 第二个* 代表的是%H的文件夹
*/
FileInputFormat.setInputPaths(job,new Path("/project/dataCollect/*/*/"));
FileSystem fs = FileSystem.get(new URI("hdfs://192.168.10.4:9000"), conf, "root");
/**
* 校验输出路径是否存在 存在删除 防止报错
*/
Path outPath = new Path("/output");
if (fs.exists(outPath)){
fs.delete(outPath,true);
}
//设置输出路径
FileOutputFormat.setOutputPath(job,outPath);
//提交运行
boolean b = job.waitForCompletion(true);
System.exit(b?0:1);
}
}
/*
自定义一个JavaBean对象 接受读取的数据 输出的时候 输出我们的javaBean对象即可 只需要在JavaBean对象中重写toString()方法*/
class WebLogCleanMapper extends Mapper<LongWritable, Text, NullWritable,WebLogBean>{
/**
*
* @param key ----每一行首字符的偏移量--首字符在文件中属于第几个字符
* @param value ----每一行的数据
* @param context ----上下文对象 将map的输出数据发送到下一个阶段
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//自定义计数器 查看处理多少条数据
context.getCounter("myCounter","totalData").increment(1);
String line = value.toString();
String[] fields = line.split(" ");
//第一个过滤的数据 当数据切割之后字段小于16个 证明里面肯定有缺失的数据 不合法
if (fields.length<16){
return;
}
/**
* 21.1.198.247 - - 2018-12-28 12:18:05 "GET https://www.taobao.com/register HTTP/1.1"
* 101 70117 https://www.taobao.com/index Windows Internet Explorer Tridentwindows
* 辽宁 41.48 123.25 19
*/
//合法的话就封装对象
WebLogBean webLogBean = new WebLogBean();
webLogBean.setRemote_addr(fields[0]);
//封装时间的
if (fields[3].equals("-")){
webLogBean.setValid(false);
}else{
webLogBean.setLocal_time(fields[3]+" "+fields[4]);//设置时间的
}
webLogBean.setRequest(fields[6]);//设置请求
webLogBean.setStatus(Integer.parseInt(fields[8]));//设置状态码的
webLogBean.setBody_bytes(Integer.parseInt(fields[9]));//设置我们的字节数
webLogBean.setReferer_url(fields[10]);//设置来源URL
/**
* 设置浏览器信息---如果一行数据切割的字段的长度大于16的话 浏览器信息在第12个字段开始
* 到length-4的位置都是浏览器信息
*/
String userAgent = "";
for (int i = 11; i < fields.length-4; i++) {
userAgent+=fields[i];
}
webLogBean.setUser_agent(userAgent);
//这是最后四个信息 省份 纬度 经度 年龄 因为前面有一个字段是浏览器信息 但是长度不固定
//所以我们这四个子端需要倒着取
webLogBean.setProvince(fields[fields.length-4]);
webLogBean.setLatitude(fields[fields.length-3]);
webLogBean.setLongitude(fields[fields.length-2]);
webLogBean.setAge(Integer.parseInt(fields[fields.length-1]));
/**
* 通过这个方法然后将我们的这个数据进行数据合法性校验
*/
weblogparser(webLogBean);
/**
* 过滤数据 ---判断对象的valid字段如果为false 那么数据不合法 舍弃掉
*/
if (!webLogBean.isValid()){
//自定义计数器 查看有多少条数据不合法
context.getCounter("myCounter","noValidData").increment(1);
return;//下面的代码不执行 当前方法直接结束
}
//自定义计数器 查看有多少条数据合法
context.getCounter("myCounter","validData").increment(1);
//如果能执行到这一步 代码数据合法 写出即可
context.write(NullWritable.get(),webLogBean);
}
/**
* 日志清洗代码
* @param webLogBean
*/
private void weblogparser(WebLogBean webLogBean) {
//如果状态码大于等于400把数据 标识成为不合法数据
if (webLogBean.getStatus()>=400){
webLogBean.setValid(false);
}
//如果省份为- 代码省份未采集到 那么数据缺失 标识为不合法
if (webLogBean.getProvince().equals("-")){
webLogBean.setValid(false);
}
//如果来源URL为- 代码数据确实 那么将数据标识为不合法数据
if (webLogBean.getReferer_url().equals("-")){
webLogBean.setValid(false);
}
}
}
自定义JavaBean对象,来接收读取的数据,并按照指定格式输出,不需要实现序列化接口。
/**
* 自定义的Javabean对象 用于接受读取的数据字段
* 188.33.49.99
* -
* -
* 2018-02-03
* 18:13:45
* "POST 请求方式
* https://www.taobao.com/search/
* HTTP/1.0" 协议
* 400
* 52471
* https://www.taobao.com/item/c
* Mozilla Firefox Geckowindows
* 吉林
* 43.54
* 125.19
* 32
*/
public class WebLogBean {
private boolean valid = true;// 这个字段是用来判断数据是否合法的 如果为false 不合法 需要舍弃的数据
private String remote_addr;//ip
private String local_time;//访问时间
private String request;//请求网址
private Integer status;//响应状态码
private Integer body_bytes;//响应字节数
private String referer_url;//来源URL
private String user_agent;//用户浏览器
private String province;//省份
private String latitude;//纬度
private String longitude;//经度
private Integer age;//年龄
public boolean isValid() {
return valid;
}
public void setValid(boolean valid) {
this.valid = valid;
}
public String getRemote_addr() {
return remote_addr;
}
public void setRemote_addr(String remote_addr) {
this.remote_addr = remote_addr;
}
public String getLocal_time() {
return local_time;
}
public void setLocal_time(String local_time) {
this.local_time = local_time;
}
public String getRequest() {
return request;
}
public void setRequest(String request) {
this.request = request;
}
public Integer getStatus() {
return status;
}
public void setStatus(Integer status) {
this.status = status;
}
public Integer getBody_bytes() {
return body_bytes;
}
public void setBody_bytes(Integer body_bytes) {
this.body_bytes = body_bytes;
}
public String getReferer_url() {
return referer_url;
}
public void setReferer_url(String referer_url) {
this.referer_url = referer_url;
}
public String getUser_agent() {
return user_agent;
}
public void setUser_agent(String user_agent) {
this.user_agent = user_agent;
}
public String getProvince() {
return province;
}
public void setProvince(String province) {
this.province = province;
}
public String getLatitude() {
return latitude;
}
public void setLatitude(String latitude) {
this.latitude = latitude;
}
public String getLongitude() {
return longitude;
}
public void setLongitude(String longitude) {
this.longitude = longitude;
}
public Integer getAge() {
return age;
}
public void setAge(Integer age) {
this.age = age;
}
/**
* toString方法是用来最后向文件中输出数据的格式
* @return
*/
@Override
public String toString() {
return valid+","+remote_addr+","+local_time+","+request+","+status+","+body_bytes+","+referer_url+","+user_agent+","+province+","+latitude+","+longitude+","+age;
}
}
如何打JAR包?MR程序打Jar包运行的两种打包方式:
1、如果是maven项目的话,点击idea右侧----maven----当前项目名---lifecycle----package就可以,jar包会放到我们项目的target目录下
?2、普通的打包方式:File----project structure---artifacts-----+----jar---->from module dependencies.....勾选include in project build, jar包在out目录下
?jar包的运行命令
hadoop jar jar包路径 全限定类名
|