IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 2021-08-20 大数据实训第五(四)天 -> 正文阅读

[大数据]2021-08-20 大数据实训第五(四)天

一、MapReduce

(一)概述

1、mapreduce就是一种分布式的计算模型
2、由谷歌提出,基于hdfs的计算模型,主要就是解决海量数据计算的问题
3、MapReduce由两个阶段组成:Map和Reduce,用户只需要实现Map和Reduce两个函数就可以实现分布式的计算。这样做的目的是为了简化分不是程序的开发和调试周期

(二)组成

1、jobTracker/resourcemanager:任务调度者,管理多个TaskTracker
2、TaskTracker/nodemanager:任务执行者,

在map阶段刚开始的时候会先对这个要处理的文件进行切片处理
切片是一个split对象。这个对象记录了要处理文件的起始位置到切片的最后位置,但是一般情况下,切片的大小就是切块的大小
maptask就是map阶段用户处理任务的线程
maptask处理的逻辑是相同的。只是处理的数据不同
在map阶段,切片不需要程序员完成,按行读取也不需要程序员做,这些都是hadoop框架帮我们做的,程序员只需要编写map方法处理框架帮我们读取完一行数据后的业务逻辑
在默认情况下,maptask在拿到数据之后,默认是按行进行数据读取的
在map处理完数据进入reduce之前,hadoop框架帮助我们根据map输出的key做了聚合操作,也不需要程序员手动执行

(三)序列化

1、hadoop在传输数据时要求数据必须经过序列化 int intWritable string Text

(四)排序

1、hadoop会根据map输出的key进行排序
2、若想根据其它排序

(五)combiner

1、MapReduce中的Combiner是为了避免map任务和reduce任务之间的数据传输而设置的,Hadoop允许用户针对map task的输出指定一个合并函数。即为了减少传输到Reduce中的数据量。它主要是为了削减Mapper的输出从而减少网络带宽和Reducer之上的负载
2、combiner也是hadoop调优的一种手段
3、引入combiner对数据统计有用,对排序无用
4、与mapper与reducer不同的是,combiner没有默认的实现,需要显式的设置在conf中才有作用
5、一般来说,combiner和reducer它们俩进行同样的操作。
6、但是:特别值得注意的一点,一个combiner只是处理一个结点中的的输出,而不能享受像reduce一样的输入(经过了shuffle阶段的数据)

(六)多元输入

(七)数据本地化策略

移动的是计算

(八)Job的执行流程

1、客户端会提交一个mr的jar给jobclient,提交方式hadoop jar
a. 校验输入输出路径是否合法
b. 输入输出的kv类型是否正确
2、jobclient通过rpc和resourcemanager/jobtracker进行通信,返回一个存放jar包的位置(HDFS),生成一个jobid,jobid全局唯一。用于标识该job
3、jobclient会将jar写入到hdfs中
4、开始提交这个任务(任务的描述信息,不是jar。而是jar在hdfs存放的位置
5、jobtracker会进行初始化任务,开始读取hdfs上面要处理的文件,计算切片,每一个切片对应一个maptask
6、tasktracker开始通过心跳机制领取任务(满足数据本地化策略)
7、tasktracker开始下载jar包。开始计算(移动的是运算而不是数据——为了提高效率)将tasktracker和datanode部署到同一台服务器
8、tasktracker开始启动一个java的子进程去执行具体的计算(mappertask或者reducetask)
9、将结果写入hdfs中

(九)shuffle过程

在这里插入图片描述
在这里插入图片描述

(十)Yarn

1、yarn是一个资源调度框架
2、jobtracker既负责资源的调度,又负责任务的监控
3、Resourcemanager只负责资源的调度,任务的监控交给applicationmaster。applicationmaster启动到nodemanager身上
4、每一个任务都会有一个applicationmaster

二、在eclipse中准备进行MapReduce的环境

1、上传文件到Hdfs中

在这里插入图片描述
在这里插入图片描述

2、新建java Project

在这里插入图片描述

3、添加依赖库

(1)将之前B_HDFS下的hdfs_jar复制到新建的Javaproject(C_MapReduce)中
在这里插入图片描述
(2)将以下四个部分的内容复制到上述hdfs_jar中
在这里插入图片描述
在这里插入图片描述
(3)添加依赖 build path

三、WordCount(统计单词出现的次数)

1、WordCountMapper

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

//--代表Map阶段
//--hadoop要求传输的数据必须要经过序列化
/**
 * KEYIN:map输入的key的类型--输入每一行数据的行首偏移量:LongWritable
 * VALUEIN:map输入的value的类型--代表每一行数据:Text
 * KEYOUT:map输出的key的类型:Text
 * VALUEOUT:map输出的value类型:IntWritable/LongWritable
 */
public class WordCountMapper extends Mapper<LongWritable,Text,Text,LongWritable> {
	@Override
	//--hadoop框架默认是按行读取,每读取一行数据就会调用一次map方法。程序员只需要在map方法中实现业务逻辑即可
	//--context就是一个上下文对象,用于输出map的结果
	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context)
			throws IOException, InterruptedException {
		//--hello tom hello bob
		String line = value.toString();
		//--按空格进行切分[hello world hello bob ]
		String[] words = line.split(" ");
		//--循环遍历输出
		for (String word : words) {
			//--进入到reduce之前会根据map输出的key进行聚合
			context.write(new Text(word), new LongWritable(1));
		}
	}

}

2、WordCountReduce

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/*
 * KEYIN:map输出的key的类型
 *VALUEIN:map输出的value的类型
 * KEYOUT:reduce输出的key的类型
 * VALUEOUT:reduce输出的value的类型
 * 
 * **/
public class WordCountReduce extends Reducer<Text, LongWritable, Text, LongWritable>{
	@Override
	//key代表的是map的输出的key
	//-value是一个迭代器,里面存放的就是hadoop框架帮我们聚合后的结果【1 1 1 1 1 1】
	protected void reduce(Text key, Iterable<LongWritable> value,
			Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
		//--定义一个变量用与存放单词的总数
		long sum=0;
		for (LongWritable count : value) {
			sum+=count.get();
		}
		context.write(key, new LongWritable(sum));
	}
}

3、WordCountDriver

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCountDriver {
	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		//--创建一个换环境配置对象
		Configuration conf=new Configuration();
		//--需要向yarn去申请一个job任务
		Job job = Job.getInstance(conf);
		//-设置job作业运行的入口类
		job.setJarByClass(WordCountDriver.class);
		//--设置mapper类
		job.setMapperClass(WordCountMapper.class);
		//--设置reduce类
		job.setReducerClass(WordCountReduce.class);
		//--设置mapper的输出类型
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(LongWritable.class);
		//--设置redcue的输出类型
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(LongWritable.class);
		//--设置要处理的文件的路径org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
		FileInputFormat.addInputPath(job, new Path("hdfs://192.168.232.129:9000/txt/words.txt"));
		//--设置计算完的数据存放的路径
		FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.232.129:9000/wordcount"));
		//--提交任务
		job.waitForCompletion(true);
	}
}

4、导出为jar

(1)选中要导出的项目,右击,点击export
在这里插入图片描述
(2)点击java->JAR file->next
在这里插入图片描述
(3)点击要导出项目前的”>“标志,只选择src,并去掉.classpath与.project的勾选,并选择导出到的目录,点击next
在这里插入图片描述
(4)点击next
在这里插入图片描述
(5)选择主类->finish
在这里插入图片描述
(6)点击ok
在这里插入图片描述

5、将jar包上传到Linux,并运行

(1)上传
在这里插入图片描述
(2)运行

hadoop jar WordCount.jar cn.edu.mapreduce.wordcount.WordCountDriver

其中cn.edu.mapreduce.wordcount.WordCountDriver可以经过如下过程复制得到
在这里插入图片描述

6、若要统计字符数,只需改动

(1)去掉空格
在这里插入图片描述
(2)更改输出目录
在这里插入图片描述

7、combiner(提高效率)

(1)WordCountCombiner.class

import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WordCountCombiner extends Reducer<Text, LongWritable, Text, LongWritable>{
	@Override
	protected void reduce(Text key, Iterable<LongWritable> value,
			Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
		//--定义一个变量用与存放单词的总数
		long sum=0;
		for (LongWritable count : value) {
			sum+=count.get();
		}
		context.write(key, new LongWritable(sum));
	}
}

(2)在WordCountDriver中添加如下内容

//--设置combiner合并类
		job.setCombinerClass(WordCountCombiner.class);

四、Flow(统计流量)

1、flow.java(按手机号码统计流量)

(1)代码

package cn.edu.mapreduce.flow;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

//--flow对象必须要经过序列化
//--实现序列化,hadoop要求实现Writable接口
public class Flow implements Writable{
	
	private String phone = "";
	private String address = "";
	private String name = "";
	private int flow ;
	//--shift+alt+s
	public String getPhone() {
		return phone;
	}
	public void setPhone(String phone) {
		this.phone = phone;
	}
	public String getAddress() {
		return address;
	}
	public void setAddress(String address) {
		this.address = address;
	}
	public String getName() {
		return name;
	}
	public void setName(String name) {
		this.name = name;
	}
	public int getFlow() {
		return flow;
	}
	public void setFlow(int flow) {
		this.flow = flow;
	}
	public Flow(String phone, String address, String name, int flow) {
		super();
		this.phone = phone;
		this.address = address;
		this.name = name;
		this.flow = flow;
	}
	public Flow() {
		super();
	}
	@Override
	public String toString() {
		return "Flow [phone=" + phone + ", address=" + address + ", name=" + name + ", flow=" + flow + "]";
	}
	@Override
	//--反序列化
	public void readFields(DataInput in) throws IOException {
		this.phone = in.readUTF();
		this.address = in.readUTF();
		this.name = in.readUTF();
		this.flow = in.readInt();		
	}
	//--序列化 序列化与反序列化顺序要一致
	@Override
	public void write(DataOutput out) throws IOException {
		out.writeUTF(phone);
		out.writeUTF(address);
		out.writeUTF(name);
		out.writeInt(flow);
	}
}

(2)小技巧
shift+alt+s
在这里插入图片描述

2、FlowMapper

package cn.edu.mapreduce.flow;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class FlowMapper extends Mapper<LongWritable, Text, Text, Flow> {
	public void map(LongWritable ikey, Text ivalue, Context context) throws IOException, InterruptedException {
		//--读取一行数据封装对象[13877779999 bj zs 2145]
		String[] datas = ivalue.toString().split(" ");
		Flow f = new Flow();
		f.setPhone(datas[0]);
		f.setAddress(datas[1]);
		f.setName(datas[2]);
		f.setFlow(Integer.parseInt(datas[3]));
		//--对外输出
		context.write(new Text(f.getPhone()), f);
	}
}

3、FlowReducer

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class FlowReducer extends Reducer<Text, Flow, Text, IntWritable> {
	public void reduce(Text _key, Iterable<Flow> values, Context context) throws IOException, InterruptedException {
		int sum = 0;
		for (Flow val : values) {
			sum += val.getFlow();
		}
		context.write(_key, new IntWritable(sum));
	}
}

4、FlowDriver

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class FlowDriver {
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf, "FlowDriver");
		job.setJarByClass(cn.edu.mapreduce.flow.FlowDriver.class);
		// TODO: specify a mapper
		job.setMapperClass(FlowMapper.class);
		// TODO: specify a reducer
		job.setReducerClass(FlowReducer.class);
		
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Flow.class);

		// TODO: specify output types
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);

		// TODO: specify input and output DIRECTORIES (not files)
		FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.232.129:9000/txt/flow.txt"));
		FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.232.129:9000/flowresult"));

		if (!job.waitForCompletion(true))
			return;
	}
}

5、分地区计算流量(须自定义分区)

若不自定义分区,hadoop会根据
(1)步骤

1,自定义分区需要继承一个partitioner的类
2,重写类中的getpartition的方法通过返回的int值确定分区3,需要在driver中设置自定义分区类
3,在driver中设置reducetask的数量(分区的数量等于reducetask的数量)
4,每一个reducetask会对应一个输出文件。

(2)FlowPartitioner

package cn.edu.mapreduce.flow;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

//KEYmap输出的key
//VALUEmap输出的value的类型
public class FlowPartitioner extends Partitioner<Text, Flow>{
	@Override
	public int getPartition(Text key, Flow value, int arg2) {
		// return 0 代表第一个分区,return 1 表示第二个分区
		if(value.getAddress().equals("bj")) {
			return 0;
		}else if(value.getAddress().equals("sh")) {
			return 1;
		}else {
			return 2;
		}
	}
}

(3)FlowDriver2

package cn.edu.mapreduce.flow;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class FlowDriver2 {
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf, "FlowDriver");
		job.setJarByClass(cn.edu.mapreduce.flow.FlowDriver.class);
		// TODO: specify a mapper
		job.setMapperClass(FlowMapper.class);
		// TODO: specify a reducer
		job.setReducerClass(FlowReducer.class);
		
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Flow.class);
		
		//--定义自定义分区类
		job.setPartitionerClass(FlowPartitioner.class);
		
		// TODO: specify output types
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		
		//--设置reducetask的数量
		job.setNumReduceTasks(3);
		
		// TODO: specify input and output DIRECTORIES (not files)
		FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.232.129:9000/txt/flow.txt"));
		FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.232.129:9000/flowresult"));

		if (!job.waitForCompletion(true))
			return;
	}
}

五、Student(排序)

1、Student .java(按成绩降序排序)

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
//--如果在javaBean对象中实现自定义排序的需求,需要实现一个WritableComparable
public class Student implements WritableComparable<Student>{
	private String name = "";
	private int score;
	
	public String getName() {
		return name;
	}
	public void setName(String name) {
		this.name = name;
	}
	public int getScore() {
		return score;
	}
	public void setScore(int score) {
		this.score = score;
	}
	
	public Student(String name, int score) {
		super();
		this.name = name;
		this.score = score;
	}
	public Student() {
		super();
	}

	@Override
	public String toString() {
		return "Student [name=" + name + ", score=" + score + "]";
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		// TODO Auto-generated method stub
		this.name = in.readUTF();
		this.score = in.readInt();
	}
	@Override
	public void write(DataOutput out) throws IOException {
		// TODO Auto-generated method stub
		out.writeUTF(name);
		out.writeInt(score);
	}

	@Override
	//--定义比较规则
	//--this在前是升序,this在后是降序
	public int compareTo(Student o) {
		// TODO Auto-generated method stub
		return o.score - this.score;
	}
}

2、StudentMapper.java

import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class StudentMapper extends Mapper<LongWritable, Text, Student, NullWritable> {
	public void map(LongWritable ikey, Text ivalue, Context context) throws IOException, InterruptedException {
		String datas[] = ivalue.toString().split(" ");
		Student student = new Student();
		student.setName(datas[0]);
		student.setScore(Integer.parseInt(datas[1]));
		context.write(student, NullWritable.get());
	}
}

3、StudentReducer.java

import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class StudentReducer extends Reducer<Student, NullWritable, Student, NullWritable> {
	public void reduce(Student _key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
		context.write(_key, NullWritable.get());
	}
}

4、StudentDriver.java

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class StudentDriver {
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf, "StudenDriver");
		job.setJarByClass(cn.edu.mapreduce.score.StudentDriver.class);
		job.setMapperClass(StudentMapper.class);
		job.setReducerClass(StudentReducer.class);

		job.setOutputKeyClass(Student.class);
		job.setOutputValueClass(NullWritable.class);

		FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.232.129:9000/txt/student.txt"));
		FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.232.129:9000/studentresult"));

		if (!job.waitForCompletion(true))
			return;
	}
}

5、Student2.java(按月份升序、成绩降序排序)

相对于Student1的不同,增加了month属性,并且改变排序规则,排序代码如下
@Override
	public int compareTo(Student2 o) {
		int r1 = this.month - o.month;
		if (r1 == 0)
			return o.score - this.score;
		else 
			return r1;
	}

6、StudentMapper2、StudentReducer2、StudentDriver2

与前面的基本相同,只需将StudentMapper、StudentReducer、StudentDriver中出现的Student改为Student2

六、CharCount

1、CharCountMapper.java

public class CharCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
	public void map(LongWritable ikey, Text ivalue, Context context) throws IOException, InterruptedException {
			String line = ivalue.toString();
			char[] chars = line.toCharArray();
			for (char c : chars) {
				context.write(new Text(c+""), new IntWritable(1));
			}
	}
}

2、CharCountReducer.java

public class CharCountReducer extends Reducer<Text, IntWritable, Text, LongWritable> {
	public void reduce(Text _key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
		long sum=0;
		for (IntWritable val : values) {
				sum+=val.get();
		}
		context.write(_key, new LongWritable(sum));
	}
}

3、CharCountDriver.java

public class CharCountDriver {
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf, "charcountdriver");
		job.setJarByClass(cn.edu.mapreduce.charcount.CharCountDriver.class);
		// TODO: specify a mapper
		job.setMapperClass(CharCountMapper.class);
		// TODO: specify a reducer
		job.setReducerClass(CharCountReducer.class);
		
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);
		
		// reduce输出类型
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(LongWritable.class);

		// TODO: specify input and output DIRECTORIES (not files)
		FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.60.128:9000/txt/characters.txt"));
		FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.60.128:9000/charcount"));

		if (!job.waitForCompletion(true))
			return;
	}

}

4、CharCountDriver2(多个输入文件)

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class CharCountDriver2 {
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf, "JobName");
		job.setJarByClass(cn.edu.mapreduce.charcount.CharCountDriver2.class);
		// TODO: specify a mapper
		job.setMapperClass(CharCountMapper.class);
		// TODO: specify a reducer
		job.setReducerClass(CharCountReducer.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);
		// TODO: specify output types
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(LongWritable.class);

		// TODO: specify input and output DIRECTORIES (not files)
		//FileInputFormat.setInputPaths(job, new Path("src"));
		//--import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
		MultipleInputs.addInputPath(job, new Path("hdfs://192.168.232.129:9000/txt/words.txt"), TextInputFormat.class);
		MultipleInputs.addInputPath(job, new Path("hdfs://192.168.232.129:9000/txt/characters.txt"), TextInputFormat.class);
		FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.232.129:9000/charcount2"));

		if (!job.waitForCompletion(true))
			return;
	}
}

七、一些eclipse小技巧

1、eclipse自动补全

windows->perference->java->Editor->Content Assist
在下图的粉色框中输入

.qwertyuioplkjhgfdsazxcvbnm

在这里插入图片描述

2、eclipse:自动覆盖@Override和实现接口

右击->source->Override/Implement Methods…(列出父类有什么函数,我们要覆盖哪个函数):
在这里插入图片描述

3、eclipse快捷键

shift+alt+s

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-21 15:32:29  更:2021-08-21 15:32:57 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 17:59:52-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码