写的不到位的地方,欢迎评论指出不足之处
- 将服务器端的 hadoop 的四个文件
- core-site.xml、hdfs-site.xml、mapred-site.xml、yarn-site.xml
- 复制到项目下的 resources
- pom.xml 配置
<!--客户端聚合包-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.5</version>
</dependency>
<property>
<name>fs.defaultFS</name>
<value>hdfs://myHA</value>
</property>
<property>
<name>ha.zookeeper.quorum</name>
<value>two:2181,three:2181,four:2181</value>
</property>
<property>
<name>ipc.client.connect.timeout</name>
<value>90000</value>
<description>客户端通过socket连接到服务器的超时时间</description>
</property>
<property>
<name>ipc.client.connect.max.retries</name>
<value>100</value>
<description>指示客户端建立服务器连接的重试次数</description>
</property>
<property>
<name>ipc.client.connect.retry.interval</name>
<value>10000</value>
<description>指示客户端在重试建立服务器连接之前等待的毫秒数</description>
</property>
<property>
<name>dfs.permissions</name>
<value>false</value>
</property>
<property>
<name>dfs.replication</name>
<value>2</value>
<description>副本数量,可以在创建文件时指定副本的实际数目。
如果在创建时未指定复制,则使用默认值</description>
</property>
<property>
<name>dfs.namenode.name.dir</name>
<value>/var/hadoop/ha/dfs/name</value>
<description>存放namenode的名称表(fsimage)的目录。
如果这是一个逗号分隔的目录列表,那么在所有目录中复制名称表,用于冗余</description>
</property>
<property>
<name>dfs.datanode.data.dir</name>
<value>/var/hadoop/ha/dfs/data</value>
<description>存放datanode块的目录。
如果这是一个逗号分隔的目录列表,那么数据将存储在所有命名的目录中,通常存储在不同的设备上</description>
</property>
<property>
<name>dfs.nameservices</name>
<value>myHA</value>
<description>逗号分隔的名称服务器列表</description>
</property>
<property>
<name>dfs.ha.namenodes.myHA</name>
<value>nn1,nn2</value>
<description>namenode的ID</description>
</property>
<property>
<name>dfs.namenode.rpc-address.myHA.nn1</name>
<value>one:8020</value>
</property>
<property>
<name>dfs.namenode.rpc-address.myHA.nn2</name>
<value>two:8020</value>
</property>
<property>
<name>dfs.namenode.http-address.myHA.nn1</name>
<value>one:50070</value>
</property>
<property>
<name>dfs.namenode.http-address.myHA.nn2</name>
<value>two:50070</value>
</property>
<property>
<name>dfs.namenode.shared.edits.dir</name>
<value>qjournal://one:8485;two:8485;three:8485/myHA</value>
<description>设置一组JournalNode的URI地址</description>
</property>
<property>
<name>dfs.journalnode.edits.dir</name>
<value>/var/hadoop/ha/dfs/jnode</value>
<description>JournalNode所在节点上的一个目录,用于存放editlog和其他状态信息</description>
</property>
<property>
<name>dfs.client.failover.proxy.provider.myHA</name>
<value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
<description>为主机配置的故障转移代理提供程序</description>
</property>
<property>
<name>dfs.ha.fencing.methods</name>
<value>sshfence</value>
<description>在出现故障时通过哪种方式登录到另一个namenode上进行接管工作</description>
</property>
<property>
<name>dfs.ha.fencing.ssh.private-key-files</name>
<value>/root/.ssh/id_rsa</value>
<description>使用私钥</description>
</property>
<property>
<name>dfs.ha.automatic-failover.enabled</name>
<value>true</value>
<description>是否启用自动故障转移</description>
</property>
<property>
<name>dfs.qjournal.start-segment.timeout.ms</name>
<value>90000</value>
<description>启动Journal超时</description>
</property>
<property>
<name>dfs.qjournal.select-input-streams.timeout.ms</name>
<value>90000</value>
<description>JournalManager超时时间</description>
</property>
<property>
<name>dfs.qjournal.write-txns.timeout.ms</name>
<value>90000</value>
<description>JournalNode超时时间</description>
</property>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
<description>以逗号分隔的服务列表</description>
</property>
<property>
<name>yarn.resourcemanager.ha.enabled</name>
<value>true</value>
<description>启用RM高可用性</description>
</property>
<property>
<name>yarn.resourcemanager.zk-address</name>
<value>two:2181,three:2181,four:2181</value>
<description>指定用于存储RM状态和进行Leader选举所使用的ZK-quorum列表</description>
</property>
<property>
<name>yarn.resourcemanager.cluster-id</name>
<value>myHA</value>
<description>集群的名称</description>
</property>
<property>
<name>yarn.resourcemanager.ha.rm-ids</name>
<value>rm1,rm2</value>
<description>启用HA时群集中的RM节点列表</description>
</property>
<property>
<name>yarn.resourcemanager.hostname.rm1</name>
<value>three</value>
<description>RM的主机名</description>
</property>
<property>
<name>yarn.resourcemanager.hostname.rm2</name>
<value>four</value>
<description>RM的主机名</description>
</property>
<property>
<name>yarn.resourcemanager.webapp.address.rm1</name>
<value>three:8088</value>
<description>RM Web应用程序的http地址</description>
</property>
<property>
<name>yarn.resourcemanager.webapp.address.rm2</name>
<value>four:8088</value>
<description>RM Web应用程序的http地址</description>
</property>
- Idea 代码 Client:MyWordCount
package com.my.hadoop.mapreduce.wc;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import java.net.URI;
import java.util.Arrays;
public class MyWordCount {
public static void main(String[] args) throws Exception {
// 读取配置文件
Configuration conf = new Configuration(true);
// 让框架知道是 windows异构平台运行
conf.set("mapreduce.app-submission.cross-platform", "true");
// 配置为本地测试
// conf.set("mapreduce.framework.name","local");
// 协调与管理资源
Job job = Job.getInstance(conf);
String [] other = {"target/Big-Data-1.0-0.1.jar",
"/data/root","/data/output"};
job.setJar(other[0]);
// 当前类名
job.setJarByClass(MyWordCount.class);
// 为当前的管理取个名
job.setJobName("myYarnText");
// 读数据和存储数据的位置
Path inputFile = new Path(other[1]);
TextInputFormat.addInputPath(job, inputFile);
Path outFile = new Path(other[2]);
if (outFile.getFileSystem(conf).exists(outFile))
outFile.getFileSystem(conf).delete(outFile, true);
// 数据格式化并存储文件
TextOutputFormat.setOutputPath(job, outFile);
// 处理每一条数据
job.setMapperClass(MyMapper.class);
// 提交时的数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 处理每一组数据
job.setReducerClass(MyReducer.class);
// 提交后关闭
job.waitForCompletion(true);
}
}
package com.my.hadoop.mapreduce.wc;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
import java.util.StringTokenizer;
/**
* Hadoop框架中,它是一个分布式
* 数据:序列化、反序列化
* Hadoop有自身一套的序列化、反序列化
* 或者自已开发类型
* 必须:实现序列化、反序列化接口、实现比较器接口
* 排序:比较
* 字典序、数值顺序
*/
public class MyMapper extends Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
/**
* TextInputFormat 文本
* @param key :是每一行字符串第一个字节面向源文件的偏移量
* @param value: 是每一行字符串的内容
* @param context
* @throws IOException
* @throws InterruptedException
*/
public void map(Object key, Text value, Context context) throws
IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
package com.my.hadoop.mapreduce.wc;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
/**
* 相同的key为一组,这一组数据调用一次reduce
* @param key
* @param values
* @param context
* @throws IOException
* @throws InterruptedException
*/
public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}


- ?注意
- 运行时会遇到用户权限的问题
- 1、将 hadoop 系统中目录设置权限
- 2、将 hdfs-site.xml 配置关闭检测
- 在 windows 上异构平台运行
- 需要在 windows 上解压 hadoop 并配置环境变量
- 需要?hadoop-on-windows-master.zip 中的 bin 复制到 hadoop 里
|