IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 【hadoop学习项目】6. 使用Partition自定义分区处理数据 -> 正文阅读

[大数据]【hadoop学习项目】6. 使用Partition自定义分区处理数据

0. 项目结构

在这里插入图片描述

训练数据

phone address name consum
13877779999 bj zs 2145
13766668888 sh ls 1028
13766668888 sh ls 9987
13877779999 bj zs 5678
13544445555 sz ww 10577
13877779999 sh zs 2145
13766668888 sh ls 9987
13877779999 bj zs 2184
13766668888 sh ls 1524
13766668888 sh ls 9844
13877779999 bj zs 6554
13544445555 sz ww 10584
13877779999 sh zs 21454
13766668888 sh ls 99747

目标:统计不同地方的购物金额总和,在shuffle阶段将数据按地区进行分区。

1. FlowBean

package hadoop_test.partition_test_06.domain;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

//实现一个writable接口,并重写两个方法,write表示序列化,readFields表示反序列化
public class FlowBean implements Writable {
	
	private String phone;
	private String addr;
	private String name;
	private int consum;
	
	@Override
	public void write(DataOutput out) throws IOException {
		out.writeUTF(phone);
		out.writeUTF(addr);
		out.writeUTF(name);
		out.writeInt(consum);
		
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		this.phone=in.readUTF();
		this.addr=in.readUTF();
		this.name=in.readUTF();
		this.consum=in.readInt();
		
	}
	
	public String getPhone() {
		return phone;
	}
	public void setPhone(String phone) {
		this.phone = phone;
	}
	public String getAddr() {
		return addr;
	}
	public void setAddr(String addr) {
		this.addr = addr;
	}
	public String getName() {
		return name;
	}
	public void setName(String name) {
		this.name = name;
	}
	public int getFlow() {
		return consum;
	}
	public void setFlow(int flow) {
		this.consum = flow;
	}
	@Override
	public String toString() {
		return "FlowBean [phone=" + phone + ", addr=" + addr + ", name=" + name + ", consum=" + consum + "]";
	}
	
	

}

2. FlowDriver

package hadoop_test.partition_test_06.flow;

import hadoop_test.Utils_hadoop;
import hadoop_test.partition_test_06.domain.FlowBean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class FlowDriver {
	public static void main(String[] args) throws Exception {
		System.setProperty("HADOOP_USER_NAME", "root");

		Configuration conf=new Configuration();
		Job job=Job.getInstance(conf);

		job.setJarByClass(FlowDriver.class);
		job.setMapperClass(FlowMapper.class);
		job.setReducerClass(FlowReducer.class);

		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(FlowBean.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(FlowBean.class);

		//配置partition
		job.setNumReduceTasks(3);		// 指定任务数,规定分区数≥任务数。一般来说任务数=分区数
		job.setPartitionerClass(FlowPartitioner.class);

		FileInputFormat.setInputPaths(job,new Path("/hadoop_test/avro/avro.txt"));
		FileOutputFormat.setOutputPath(job,new Path("/hadoop_test/avro/result"));
		job.waitForCompletion(true);
	}

}

3. FlowMapper

package hadoop_test.partition_test_06.flow;

import hadoop_test.partition_test_06.domain.FlowBean;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class FlowMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
	@Override
	protected void map(LongWritable key, Text value, Context context)
			throws IOException, InterruptedException {
		String line=value.toString();
		// 实例化
		FlowBean flowBean=new FlowBean();
		// 给一个实例的属性赋予初始值,
		flowBean.setPhone(line.split(" ")[0]);
		flowBean.setAddr(line.split(" ")[1]);
		flowBean.setName(line.split(" ")[2]);
		// 统计总额,同时将文本数据转换为int型
		flowBean.setFlow(Integer.parseInt(line.split(" ")[3]));

		context.write(new Text(flowBean.getName()), flowBean);
	}
}

4. FlowPartitioner

package hadoop_test.partition_test_06.flow;

import hadoop_test.partition_test_06.domain.FlowBean;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

import java.util.Random;


public class FlowPartitioner extends Partitioner<Text,FlowBean> {
	@Override
	//<Text,FlowBean>指的是map的key,value
	public int getPartition(Text key, FlowBean value, int numPartitions) {
//数据倾斜解决
//		Random R = new Random();
//
//		String hash_key=key.toString()+String.valueOf(R.nextInt());
//		return (hash_key .hashCode() & Integer.MAX_VALUE) % numPartitions;

		if(value.getAddr().equals("sh")){
			return 0;
		}
		if(value.getAddr().equals("bj")){
			return 1;
		}
		else{
			return 2;
		}
		
	}

}

Partition

分区主要的作用就是将相同的数据发送到同一个reduceTask里面去,从而将不同分区的Key交由不同的Reduce处理。在MapReduce中有一个抽象类叫做Partitioner,默认使用的实现类是HashPartitioner(按照key的hashCode % reduceTask 数量 = 分区号,默认reduce Task = 1)。

(1)配置Partition

设置相应的数量的ReduceTask

job.setNumReduceTasks(reduceTask数量);
  • 如果ReduceTask的数量 > getPartition的数量,则会多产生几个空的输出文件part-r-000xx
  • 如果ReduceTask的数量 < getPartition的数量,则有一部分数据会丢失,会抛出异常
  • 如果ReduceTask的数量 1 ,则不管MapTask输出多少个分区文件,最终都交给一个ReduceTask,最终也就只会产生一个结果文件
    分区号必须从零开始,逐一累加。

指定自定义的Partitioner

job.setPartitionerClass(实现Partition类的名字.class);

自定义的类,要捆绑到job中,否则job还会走默认的分区代码块,使用HashPartitioner实现。

(2)Partitioner类

所有partitioner都继承自抽象类Partitioner ,实现getPartition(KEY var1, VALUE var2, intvar3)方法

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
//

package org.apache.hadoop.mapreduce;

import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;

@Public
@Stable
public abstract class Partitioner<KEY, VALUE> {
    public Partitioner() {
    }

    public abstract int getPartition(KEY var1, VALUE var2, int var3);
}

hadoop自带Partitioner有:
在这里插入图片描述
HashPartitioner类

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
//

package org.apache.hadoop.mapreduce.lib.partition;

import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.mapreduce.Partitioner;

@Public
@Stable
public class HashPartitioner<K, V> extends Partitioner<K, V> {
    public HashPartitioner() {
    }

    public int getPartition(K key, V value, int numReduceTasks) {
        return (key.hashCode() & 2147483647) % numReduceTasks;
    }
}

其中2147483647为32位的最大正整数01111111111111111111111111111111

参考资料:
Hadoop MapReduce Shuffle机制之Partition分区 | 及分区案例实操
Hadoop自定义分区
MapReduce 进阶:Partitioner 组件

5. FlowReducer

package hadoop_test.partition_test_06.flow;

import hadoop_test.partition_test_06.domain.FlowBean;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class FlowReducer extends Reducer<Text, FlowBean,Text,FlowBean> {

	@Override
	protected void reduce(Text name, Iterable<FlowBean> values,
                          Context context)
			throws IOException, InterruptedException {
		FlowBean tmp=new FlowBean();
		for(FlowBean flowbean:values){
//		flowbean [phobe=13766668888,add=sh,name=ls,consum=9844]
			tmp.setAddr(flowbean.getAddr());
			tmp.setPhone(flowbean.getPhone());
			tmp.setName(flowbean.getName());
//     tmp.getComsum(初始化是0)+flowbean.getConsum()[9844]
			tmp.setFlow(tmp.getFlow()+flowbean.getFlow());
//			在第一轮 tmp.Consum = tmp.getConsum()=[9844]
//			在第二轮时,FlowBean [phobe=13766668888,add=sh,name=ls,consum=1000]
		}
		context.write(name, tmp);
	}
}

输出结果
在这里插入图片描述

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-02-09 20:46:17  更:2022-02-09 20:47:13 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 13:43:47-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码