WordCount类
一、drver 类
- Configuration:与HDFS中的Configuration一致,负责参数的加载和传递
Job:作业,是对一轮MapReduce任务的抽象,即一个MapReduce的执行全过程的管理类 FileInputFormat:指定输入数据的工具类,用于指定任务的输入数据路径 FileOutputFormat:指定输出数据的工具类,用于指定任务的输出数据路径 - 实现
- 先得到集群的配置参数,用 Configuration类
- 将集群参数设置到本次的job实例中,Job类
- 指定本次执行的主类,也就是本drver类
- 指定map类、combiner类、reducer类
- 指定job输出的key和value的类型,如果map和reduce输出类型不完全相同,需要重新设置map的output的key和value的class类型
- 指定输入数据的路径、输出路径,并要求该输出路径一定是不存在的(他会读取指定目录下的所有文件,不能读目录,然后生成并输出在指定的输出目录下),FileInputFormat,FileOutputFormat
- 指定job执行模式,等待任务执行完成后,提交任务的客户端才会退出!
二、map类编写
- Mapper:是MapReduce计算框架中Map过程的封装
Text:Hadoop对Java String类的封装,适用于Hadoop对文本字符串的处理 IntWritable:Hadoop对JavaInteger类的封装,适用于Hadoop整型的处理 Context:Hadoop环境基于上下文的操作对象,如Map中key/value的输出、分布式缓存数据、分布式参数传递等 StringTokenizer:对String对象字符串的操作类,做基于空白字符的切分操作工具类 - 代码编写
- 类继承Mapper<Object, Text, Text, IntWritable>泛型里Object是偏移量,Text是传入文本,另一个是输出文本,IntWritable是词频
- 暂存每个词和它的的词频(可以避免下面重复申请耗费空间)
- 编写核心map方法操作(Object Text)
- 引入一个Context对象,可以创建也可以在入参列表里加一个
- 用传进来的Text类型的每行文本,初始化StringTokenizer相当于分割成字符数组
- 循环取得每个分割出来的元素放到刚才定义过的词对象中
- 通过Context对象将map输出(词 , 词频)
三、reduce类编写
- Reducer:是MapReduce计算框架中Reduce过程的封装
- 实现
- 类继承Reducer<Text, IntWritable, Text, IntWritable>,接收shuffle过来的k,v对,它有一个GroupingComparator分组,把k相同的合并一起,把v做成一个集合,成<k,list(values)>形式,传给下面的reduce方法,GroupingComparator在分组问题中可以通过重写方法实现.
- 将IntWritable 类型value和在mapper一样在类里定义
- reduce方法编写
- 它的入参是k,数字类型的转换器接收的value集合和Context
- 计算value的和,可用加强型for,依次获取迭代器中的每个元素值,即为一个一个的词频数值,求出和sum
- 将sum赋值给上面IntWritable 类型value
- 用 context.write(key, value);将计算结果逐条输出
执行
- 打包,上传
- 运行
yarn jar 包名(加全类名可以指定运行哪个类) 要分析的文件夹 想要创建并输出分析结果的文件夹(目录)
三合一标准代码实现
把三个类放一块就行了,公用的东西可以提出来进一步优化
MapReduce Shell应用
查看
- mapred称为一级命令,直接输入mapred回车,即可查看二级命令
- 输入一级命令mapred后,再任意输入一个二级命令,即可查看三级命令
例
MapReduce技术特征
- 向“外”横向扩展,而非向“上”纵向扩展
- 移动计算,把处理向数据迁移(数据本地性)
- 顺序处理数据、避免随机访问数据
- 为应用开发隐藏系统底层细节
- 平滑无缝的可扩展性
- 推测执行
- 采用推测执行机制,发现某个任务的运行速度远低于任务平均速度,会为慢的任务启动一个备份任务,同时运行。哪个先运行完,采用哪个结果
- 失效被认为是常态
相关技术问题
1.判断一个输入串的类型
- 正则表达式
- Java实现正则表达式的核心类
-
Pattern 获取pattern实现,Pattern.compile -
Matcher Pattern.matcher可以得到matcher实例 其有3个匹配方法 Matches:全部匹配 lookingAt: 前向匹配 find:任意匹配 String digitalRegex="[\u4e00-\u9fa5]+";/*判断中文由ASCII u4e00-u9fa5 "[\d]+"判断数字 "[a-zA-Z]+"字母 */ String input=“中国123”; Pattern pattern= Pattern.compile(digitalRegex); Matcher matcher=pattern.matcher(input);
matcher 返回布尔类型true为是 - 强制转化
强制转化,报错就不是 - 现成的api
StringUtils.isNumbers();//判断是不是数字 2.hdfs当中数据可分block块存储和可切分计算的问题 - 是不是所有的文件都可以分块存储
是,都是二进制文件,可以切分 - 是不是所有的文件都可以进行切分
不是因为涉及到计算,需要针对数据的格式进行区分对待。
3.企业当中的研发环境的分类
- 生产环境-线上环境-product
- 试验田环境-高度模拟生产环境
- 测试环境-test
- 开发环境-dev
- QA环境(质量监控,独立的环境)
hdfs文件格式
面向行/列 类型名称 是否可切分 优点 缺点 适用场景 面向行 .txt 是 查看,编辑简单 无压缩占空间大、传输压力大、数据解析开销大 学习练习 面向列 .seq 是 原生,k,v二进制存储 数据堆起来的,本地查看不方便 生产环境,map输出的默认格式 面向列 .rc 是 横着切,放一个块里,再按列存储,加载快、查询快、空间利用率高、可高负载 好,但不是最好,中庸 学习生产 面向列 .orc 是 和.rc一样优点,支持新的数据类型,提高了速度 同上 同上
可切分性 类型名称 是否原生 优点 缺点 适用于 可切分 lzo(.lzo) 否 压缩/解压速度快,合理的压缩率 压缩率比gzip低,非原生、需要native安装 压缩完成后>=200M为宜,越大越好 可切分 bzip2(.bz2) 是 高压缩率超过gzip,原生支持、不需要native安装,用linux bzip可解压操作 压缩/解压速率慢 处理速度要求不高、压缩率要求高的情况 不可切分 gzip(.gz) 是 压缩/解压速度快,原生/native都支持,使用方便 不可切分,对cpu要求高 128M上下适宜 不可切分 snappy(.snappy) 否 高压缩/解压速度,合理的压缩率 压缩率比gzip低,非原生、需要native安装 适合作为map到reduce或是job数据流中间的数据传输格式
输出gzip格式
1.先在driver里使用参数解析器,就可以输入系统参数了
//参数解析器
GenericOptionsParser optionParser = new GenericOptionsParser(conf, args);
String[] remainingArgs = optionParser.getRemainingArgs();//Remaining是除了系统命令之外的应用参数
输入和输出路径也要改成remainingArgs[0]``remainingArgs[1]
2.命令差别
通过shell命令改动,添加参数设置模板: yarn jar jar_path main_class_path -Dk1=v1参数列表 具体应用:
yarn jar TlHadoopCore-jar-with-dependencies.jar \
com.tianliangedu.examples.WordCountV2 \
-Dmapred.output.compress=true \
-Dmapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec \
/tmp/tianliangedu/input /tmp/tianliangedu/output19
自定义Partition
原来划分Partition是通过哈希值对reduce求模,return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;其中Integer.MAX_VALUE是用来让哈希值不为负,一般不需要,但是有时候可能重写
1.reduce数量确认时机
- 在Job提交后,任务正式开始计算之前即已经确定
- Map数量的确定:由输入数据文件的总大小、数据格式、块大小综合确定,待冲刺环节详解
- Reduce数量确定:系统根据输入数据量的大小自动确定,有固定的计算公式,超过1g就是两个
2.自定义reduce数量
yarn jar TlHadoopCore-jar-with-dependencies.jar \
com.tianliangedu.examples.WordCountV2 \
-Dmapred.output.compress=true \
-Dmapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec \
-Dmapred.reduce.tasks=2 \
/tmp/tianliangedu/input /tmp/tianliangedu/output38
这个可以写个脚本,建一个.sh文件,首行#! /bin/bash表明bash运行,写命令,用sh 文件名运行
3.自定义Partition实现
通过继承Partitioner类,自定义实现Partition
/** 自定义Partition的定义 */
public static class MyHashPartitioner<K, V> extends Partitioner<K, V> {
/** Use {@link Object#hashCode()} to partition. */
public int getPartition(K key, V value, int numReduceTasks) {
return (key.toString().charAt(0) < 'q' ? 0 : 1) % numReduceTasks;
// return key.toString().charAt(0);
}
}
只需要改它的判定函数,👆代码就是判断小于q的是0大于的是1再和reduce模
4.通过代码中指定partition来实现
在driver中指定map类下指定
job.setPartitionerClass(MyHashPartitioner.class);
通过配置参数实现
因为有的计算比如平均数不能有combiner,所以可以在driver注释指定combiner类来去点combiner
|