IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 一起重新开始学大数据-hadoop篇-day46 Hdfs、yarn的HA,mapreduce(1) -> 正文阅读

[大数据]一起重新开始学大数据-hadoop篇-day46 Hdfs、yarn的HA,mapreduce(1)

一起重新开始学大数据-hadoop篇-day46 Hdfs、yarn的HA,mapreduce(1)

在这里插入图片描述

HA(高可用)


HDFS的HA

相较于Hadoop1.x ,Hadoop2.x中的HDFS增加了两个重要特性,HA和Federation。HA即为High Availability,用于解决NameNode单点故障问题,该特性通过热备的方式为主NameNode提供一个备用者,一旦主NameNode出现故障,可以迅速切换至备NameNode,从而实现不间断对外提供服务。Federation即为“联邦”,该特性允许一个HDFS集群中存在多个NameNode同时对外提供服务,这些NameNode分管一部分目录(水平切分),彼此之间相互隔离,但共享底层的DataNode存储资源。
在这里插入图片描述

HA的failover原理

HDFS的HA,指的是在一个集群中存在两个NameNode,分别运行在独立的物理节点上。在任何时间点,只有一个NameNodes是处于Active状态,另一种是在Standby状态。 Active NameNode负责所有的客户端的操作,而Standby NameNode用来同步Active NameNode的状态信息,以提供快速的故障恢复能力。

为了保证Active NN与Standby NN节点状态同步,即元数据保持一致。除了DataNode需要向两个NN发送block位置信息外,还构建了一组独立的守护进程”JournalNodes”,用来同步Edits信息。当Active NN执行任何有关命名空间的修改,它需要持久化到一半以上的JournalNodes上。而Standby NN负责观察JNs的变化,读取从Active NN发送过来的Edits信息,并更新自己内部的命名空间。一旦ActiveNN遇到错误,Standby NN需要保证从JNs中读出了全部的Edits,然后切换成Active状态。
使用HA的时候,不能启动SecondaryNameNode,会出错。

HDFS的Federation

HDFS Federation设计可解决单一命名空间存在的以下几个问题:
(1)HDFS集群扩展性。多个NameNode分管一部分目录,使得一个集群可以扩展到更多节点,不再像1.0中那样由于内存的限制制约文件存储数目。
(2)性能更高效。多个NameNode管理不同的数据,且同时对外提供服务,将为用户提供更高的读写吞吐率。
(3)良好的隔离性。用户可根据需要将不同业务数据交由不同NameNode管理,这样不同业务之间影响很小。

Federation架构图

在这里插入图片描述

HDFS的HA搭建

(1-3执行过则不做)
执行前集群规划:

masternode1node2
NameNodeNameNode
JournalNodeJournalNodeJournalNode
DataNodeDataNodeDataNode

**************JournalNode-----日志节点,考虑日志的安全性

1、防火墙
service firewalld stop
2、时间同步
yum install ntp
ntpdate -u s2c.time.edu.cn

3、免密钥 (远程执行命令)
在两个主节点生成密钥文件
ssh-keygen -t rsa
ssh-copy-id ip

master-->master,node1,node2
node1-->master,node1,node2

4、修改hadoop配置文件
core-site.xml(将原来的修改为如下)

<configuration>
	<property>
		<name>fs.defaultFS</name>
		<value>hdfs://cluster</value>
	</property>
	<property>
		<name>hadoop.tmp.dir</name>
		<value>/usr/local/soft/hadoop-2.7.6/tmp</value>
	</property>
	<property>
		<name>fs.trash.interval</name>
		<value>1440</value>
	</property>
	<property>
	      <name>ha.zookeeper.quorum</name>
	      <value>master:2181,node1:2181,node2:2181</value>
	</property>
</configuration>

hdfs-site.xml(将原来的修改为如下)

<configuration>
<!-- 指定hdfs元数据存储的路径 -->
<property>
<name>dfs.namenode.name.dir</name>
<value>/usr/local/soft/hadoop-2.7.6/data/namenode</value>
</property>

<!-- 指定hdfs数据存储的路径 -->
<property>
<name>dfs.datanode.data.dir</name>
<value>/usr/local/soft/hadoop-2.7.6/data/datanode</value>
</property>

<!-- 数据备份的个数 -->
<property>
<name>dfs.replication</name>
<value>1</value>
</property>

<!-- 关闭权限验证 -->
<property>
<name>dfs.permissions.enabled</name>
<value>false</value>
</property>

<!-- 开启WebHDFS功能(基于REST的接口服务) -->
<property>
<name>dfs.webhdfs.enabled</name>
<value>true</value>
</property>

<!-- //以下为HDFS HA的配置// -->
<!-- 指定hdfs的nameservices名称为mycluster -->
<property>
<name>dfs.nameservices</name>
<value>cluster</value>
</property>

<!-- 指定cluster的两个namenode的名称分别为nn1,nn2 -->
<property>
<name>dfs.ha.namenodes.cluster</name>
<value>nn1,nn2</value>
</property>

<!-- 配置nn1,nn2的rpc通信端口 -->
<property>
<name>dfs.namenode.rpc-address.cluster.nn1</name>
<value>master:8020</value>
</property>
<property>
<name>dfs.namenode.rpc-address.cluster.nn2</name>
<value>node1:8020</value>
</property>

<!-- 配置nn1,nn2的http通信端口 -->
<property>
<name>dfs.namenode.http-address.cluster.nn1</name>
<value>master:50070</value>
</property>
<property>
<name>dfs.namenode.http-address.cluster.nn2</name>
<value>node1:50070</value>
</property>

<!-- 指定namenode元数据存储在journalnode中的路径 -->
<property>
<name>dfs.namenode.shared.edits.dir</name>
<value>qjournal://master:8485;node1:8485;node2:8485/cluster</value>
</property>

<!-- 指定journalnode日志文件存储的路径 -->
<property>
<name>dfs.journalnode.edits.dir</name>
<value>/usr/local/soft/hadoop-2.7.6/data/journal</value>
</property>

<!-- 指定HDFS客户端连接active namenode的java类 -->
<property>
<name>dfs.client.failover.proxy.provider.cluster</name>
<value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
</property>

<!-- 配置隔离机制为ssh -->
<property>
<name>dfs.ha.fencing.methods</name>
<value>
sshfence
shell(/bin/true)
</value>
</property>

<!-- 指定秘钥的位置 -->
<property>
<name>dfs.ha.fencing.ssh.private-key-files</name>
<value>/root/.ssh/id_rsa</value>
</property>

<!-- 开启自动故障转移 -->
<property>  
<name>dfs.ha.automatic-failover.enabled</name>
<value>true</value>
</property>
</configuration>

停止HDFS集群:stop-dfs.sh

同步到其它节点

	cd /usr/local/soft/hadoop-2.7.6/etc/hadoop
	scp ./* node1:`pwd`
	scp ./* node2:`pwd`

5、删除hadoop数据存储目录下的文件 每个节点都需要删除
rm -rf /usr/local/soft/hadoop-2.7.6/tmp

6、启动zookeeper 三台都需要启动
zkServer.sh start
zkServer.sh status

7、启动JN 存储hdfs元数据
三台JN上执行 启动命令:
/usr/local/soft/hadoop-2.7.6/sbin/hadoop-daemon.sh start journalnode

8、格式化 在一台NN上执行,这里选择master
hdfs namenode -format
启动当前的NN
hadoop-daemon.sh start namenode

9、执行同步 没有格式化的NN上执行 在另外一个namenode上面执行 这里选择node1
/usr/local/soft/hadoop-2.7.6/bin/hdfs namenode -bootstrapStandby

10、格式化ZK 在master上面执行
!!一定要先 把zk集群正常 启动起来
/usr/local/soft/hadoop-2.7.6/bin/hdfs zkfc -formatZK

11、启动hdfs集群,在master上执行
start-dfs.sh

YARN 的HA

在这里插入图片描述

???在Hadoop的YARN集群中,ResourceManager负责跟踪集群中的资源,以及调度应用程序(例如MapReduce作业)。在Hadoop2.4之前,集群中只有一个ResourceManager,当其中一个岩机时,将影响整个集群。高可用特性增加了冗余的形式,即一个活动/备用的ResourceManager对,以便可用进行故障转移。

RMStateStore

ResourceManager HA 由一对 Active,Standby节点构成,通过RMStataStore存储内部数据和主要应用的数据及标记。
目前支持的可替代的 RMStateStore实现有:
基于内存的 MemoryRMStateStore,基于文件系统的FileSystemRMStateStore,及基于Zookeeper的ZKRMStateStore。
ResourceManager HA的架构模式同NameNode HA的架构模式基本一致,数据共享由RMStateStore,而ZKFC称为ResourceManager进程的一个服务,非独立存在。

Yarn的HA搭建

yarn 高可用
1、修改配置文件
yarn-site.xml(将原来的修改为如下)

<configuration>
<!-- NodeManager上运行的附属服务,需配置成mapreduce_shuffle才可运行MapReduce程序 -->
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>

<!-- 开启日志 -->
<property>
<name>yarn.log-aggregation-enable</name>
<value>true</value>
</property>

<!-- 配置日志删除时间为7天,-1为禁用,单位为秒 -->
<property>
<name>yarn.log-aggregation.retain-seconds</name>
<value>604800</value>
</property>


<!-- //以下为YARN HA的配置// -->
<!-- 开启YARN HA -->
<property>
<name>yarn.resourcemanager.ha.enabled</name>
<value>true</value>
</property>

<!-- 启用自动故障转移 -->
<property>
<name>yarn.resourcemanager.ha.automatic-failover.enabled</name>
<value>true</value>
</property>

<!-- 指定YARN HA的名称 -->
<property>
<name>yarn.resourcemanager.cluster-id</name>
<value>yarncluster</value>
</property>

<!-- 指定两个resourcemanager的名称 -->
<property>
<name>yarn.resourcemanager.ha.rm-ids</name>
<value>rm1,rm2</value>
</property>

<!-- 配置rm1,rm2的主机 -->
<property>
<name>yarn.resourcemanager.hostname.rm1</name>
<value>master</value>
</property>
<property>
<name>yarn.resourcemanager.hostname.rm2</name>
<value>node1</value>
</property>

<!-- 配置YARN的http端口 -->
<property>
<name>yarn.resourcemanager.webapp.address.rm1</name>
<value>master:8088</value>
</property>	
<property>
<name>yarn.resourcemanager.webapp.address.rm2</name>
<value>node1:8088</value>
</property>

<!-- 配置zookeeper的地址 -->
<property>
<name>yarn.resourcemanager.zk-address</name>
<value>master:2181,node1:2181,node2:2181</value>
</property>

<!-- 配置zookeeper的存储位置 -->
<property>
<name>yarn.resourcemanager.zk-state-store.parent-path</name>
<value>/rmstore</value>
</property>

<!-- 开启yarn resourcemanager restart -->
<property>
<name>yarn.resourcemanager.recovery.enabled</name>
<value>true</value>
</property>

<!-- 配置resourcemanager的状态存储到zookeeper中 -->
<property>
<name>yarn.resourcemanager.store.class</name>
<value>org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore</value>
</property>

<!-- 开启yarn nodemanager restart -->
<property>
<name>yarn.nodemanager.recovery.enabled</name>
<value>true</value>
</property>

<!-- 配置nodemanager IPC的通信端口 -->
<property>
<name>yarn.nodemanager.address</name>
<value>0.0.0.0:45454</value>
</property>
</configuration>

停止yarn集群:stop-yarn.sh

同步到其它节点

	cd /usr/local/soft/hadoop-2.7.6/etc/hadoop
	scp ./* node1:`pwd`
	scp ./* node2:`pwd`

2、启动yarn 在master启动
start-yarn.sh

3、在node1上启动另一个RM
/usr/local/soft/hadoop-2.7.6/sbin/yarn-daemon.sh start resourcemanager

hdfs 常见问题

集群启动失败

??查看日志
hdfs文件无法操作

  • 一般是因为处于安全模式下
    • 离开安全模式:hdfs dfsadmin -safemode leave
    • 进入安全模式:hdfs dfsadmin -safemode enter
    • 查看安全模式:hdfs dfsadmin -safemode get

mapreduce(1)

MapReduce概述

  • MapReduce是一种分布式计算模型,由Google提出,主要用于搜索领域,解决海量数据的计算问题.
  • MapReduce是分布式运行的,由两个阶段组成:Map和Reduce,Map阶段是一个独立的程序,有很多个节点同时运行,每个节点处理一部分数据。Reduce阶段是一个独立的程序,有很多个节点同时运行,每个节点处理一部分数据【在这先把reduce理解为一个单独的聚合程序即可】。
  • MapReduce框架都有默认实现,用户只需要覆盖map()和reduce()两个函数,即可实现分布式计算,非常简单。
  • 这两个函数的形参和返回值都是<key、value>,使用的时候一定要注意构造<k,v>。

MapReduce完整图

在这里插入图片描述

Mapper、shuffer、Reducer(mapreduce简图)

在这里插入图片描述

环形缓冲区

在这里插入图片描述
在这里插入图片描述

MapReduce任务–简单版worldcount

package com.shujia.MapReduce;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
/**
 * 读取文件
 * 统计每个单词出现的数量
 */
public class Demo2WordCount {
    //  Mapper<LongWritable, Text, Text, LongWritable>
    // Mapper<输入Map的key的类型,输入Map的value的类型,Map输出的Key的类型,Map输出的Value的类型>
    // Map默认的inputformat是TextInputFormat
    // TextInputFormat:会将数据每一行的偏移量作为key,每一行作为value输入到Map端
    public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
        // 实现自己的Map逻辑

        /**
         * @param key:输入Map的key
         * @param value:输入Map的Value
         * @param context:MapReduce程序的上下文环境,Map端的输出可以通过context发送到Reduce端
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws IOException, InterruptedException {
            // 获取一行数据
            // hadoop,hive,hbase => hadoop,1  hive,1 hbase,1
            String line = value.toString();
            // 按照逗号分割
            String[] splits = line.split(",");
            for (String word : splits) {
                // 将结果发送到reduce端
                context.write(new Text(word), new LongWritable(1));
            }

        }
    }

    // Reducer<Text, LongWritable,Text, LongWritable>
    // Reducer<Map输出的Key的类型,Map输出的Value的类型,Reduce输出的Key的类型,Reduce输出的Value的类型>
    public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
        /**
         * @param key:从Map端输出并经过分组后的key(相当于对Map输出的key做一个group             by)
         * @param values:从Map端输出并经过分组后的key                                对应的value的集合
         * @param context:MapReduce程序的上下文环境,Reduce端的输出可以通过context最终写到HDFS
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        protected void reduce(Text key, Iterable<LongWritable> values, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
            // hadoop,{1,1,1,1,1}
            long sum = 0;
            for (LongWritable value : values) {
                sum += value.get();
            }
            // 将最终结果输出
            context.write(key, new LongWritable(sum));
        }
    }

    // 组装MapReduce任务
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        Configuration conf = new Configuration();
        // 配置reduce最终输出的key value的分隔符为 #
        conf.set("mapred.textoutputformat.separator", "#");
        // 做其他的配置

        // 创建一个Job
        Job job = Job.getInstance(conf);

        // 设置reduce的个数
        job.setNumReduceTasks(2);

        // 设置Job的名字
        job.setJobName("MyWordCountMapReduceApp");
        // 设置MapReduce运行的类
        job.setJarByClass(Demo2WordCount.class);

        // 配置Map
        // 配置Map Task运行的类
        job.setMapperClass(MyMapper.class);
        // 设置Map任务输出的key的类型
        job.setMapOutputKeyClass(Text.class);
        // 设置Map任务输出的value的类型
        job.setMapOutputValueClass(LongWritable.class);

        // 配置Reduce
        // 配置Reduce Task运行的类
        job.setReducerClass(MyReducer.class);
        // 设置Reduce任务输出的key的类型
        job.setOutputKeyClass(Text.class);
        // 设置Reduce任务输出的value的类型
        job.setOutputValueClass(LongWritable.class);

        // 配置输入输出路径
        // 将输入的第一个参数作为输入路径,第二个参数作为输出的路径
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 提交任务并等待运行结束
        job.waitForCompletion(true);

    }
    /**
     * 打包并上传到master
     * 准备好输入的数据 并上传到HDFS的/input1
     *  hadoop jar jar包的路径 com.shujia.MapReduce.Demo2WordCount /input1 /output1
     */
}

|
|
|
|

上一章-hadoop篇-Day45-hdfs解析和zookeeper安装
下一章-随缘更新
|
|
|
|
|

听说长按大拇指👍会发生神奇的事情呢!好像是下面的画面,听说点过的人🧑一个月内就找到了对象的💑💑💑,并且还中了大奖💴$$$,考试直接拿满分💯,颜值突然就提升了😎,虽然对你好像也不需要,是吧,吴彦祖🤵!

在这里插入图片描述在这里插入图片描述在这里插入图片描述

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-09-02 11:26:45  更:2021-09-02 11:27:33 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 17:03:41-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码