MapReduce的经典入门案例
WordCount项目
Java实现
-
pom.xml <?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.xxxx</groupId>
<artifactId>hadoop-mr-wordcount-demo</artifactId>
<version>1.0</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<hadoop.version>3.1.2</hadoop.version>
<commons-io.version>2.4</commons-io.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>${commons-io.version}</version>
</dependency>
<dependency>
<groupId>com.janeluo</groupId>
<artifactId>ikanalyzer</artifactId>
<version>2012_u6</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.4.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.5.5</version>
<configuration>
<archive>
<manifest>
<mainClass>com.yjxxt.job.WordCountJob</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<appendAssemblyId>false</appendAssemblyId>
</configuration>
</plugin>
</plugins>
</build>
</project>
-
定义Job类 import cn.hutool.core.date.DatePattern;
import cn.hutool.core.date.DateUtil;
import com.yjxxt.mapper.WordCountMapper;
import com.yjxxt.reducer.WordCountReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.util.Date;
public class WordCountJob {
private static final int REDUCER_NUM = 2;
private static final String DATA_PATH = "/yjxxt/harry.txt";
public static void main(String[] args) throws Exception {
Configuration configuration=new Configuration(true);
Job job = Job.getInstance(configuration);
job.setJarByClass(WordCountJob.class);
String dateTimeStr = DateUtil.format(new Date(), DatePattern.PURE_DATETIME_FORMAT);
job.setJobName(String.format("wc_%s", dateTimeStr));
job.setNumReduceTasks(REDUCER_NUM);
FileInputFormat.setInputPaths(job, new Path(DATA_PATH));
FileOutputFormat.setOutputPath(job,
new Path(String.format("/yjxxt/result/wc_%s", dateTimeStr)));
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
job.waitForCompletion(true);
}
}
-
定义Mapper类 import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Arrays;
public class WordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
private static final Logger LOGGER = LoggerFactory.getLogger(WordCountMapper.class);
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context)
throws IOException, InterruptedException {
LOGGER.info("输入偏移量key = {}, 读取的行value = {}", key, value);
String newValue = value.toString().replaceAll("[^a-zA-Z0-9'\\s]", "");
LOGGER.info("替换特殊字符Value = {}", newValue);
String[] strArrays = newValue.split(" ");
Arrays.stream(strArrays).forEach(v -> {
try {
context.write(new Text(v), new LongWritable(1));
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
-
定义Reducer类 import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
public class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
private static final Logger LOGGER = LoggerFactory.getLogger(WordCountReducer.class);
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Reducer<Text, LongWritable,
Text, LongWritable>.Context context) throws IOException, InterruptedException {
long num = 0;
while(values.iterator().hasNext()) {
num += values.iterator().next().get();
}
LOGGER.info("输入的key = {}, value = {}", key, num);
context.write(key, new LongWritable(num));
}
}
Resources文件夹下放的配置文件
- core-site.xml
- hdfs-site.xml
- log4j.properties
- mapred-site.xml
- yarn-site.xml
Linux上运行
使用插件maven-assembly-plugin,打包好后将jar包上传到linux服务器,执行hadoop jar wordcount.jar com.yjxxt.job.WordCountJob
天气信息
统计各地区每天的最高温和最低温
-
定义WeatherJob类 import cn.hutool.core.date.DateUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.util.Date;
public class WeatherCounterJob {
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration(true);
configuration.set("mapreduce.framework.name", "local");
String dateStr = DateUtil.format(new Date(), "yyyyMMddHHmmss");
String jobName = String.format("WeatherCounter-%s", dateStr);
Job weatherCounterJob = Job.getInstance(configuration, jobName);
weatherCounterJob.setJarByClass(WeatherCounterJob.class);
weatherCounterJob.setMapOutputKeyClass(Text.class);
weatherCounterJob.setMapOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(weatherCounterJob,
"/yjxxt/data/area_weather.csv");
FileOutputFormat.setOutputPath(weatherCounterJob,
new Path(String.format("/yjxxt/result/%s", jobName)));
weatherCounterJob.setMapperClass(WeatherCounterMapper.class);
weatherCounterJob.setReducerClass(WeatherCounterReducer.class);
weatherCounterJob.waitForCompletion(true);
}
}
-
定义Maper类 import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class WeatherCounterMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
if (key.get() == 0) {
return;
}
String[] recode = value.toString().replaceAll("\"", "").split(",");
String outPutKey = String.format("%s%s%s:%s", recode[1],
recode[2], recode[3], recode[9].split(" ")[0]);
int temperature = Integer.parseInt(recode[5]);
context.write(new Text(outPutKey), new IntWritable(temperature));
}
}
-
定义Reducer类
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class WeatherCounterReducer extends Reducer<Text, IntWritable, Text,IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int minTemperature=Integer.MAX_VALUE;
int maxTemperature=Integer.MIN_VALUE;
while (values.iterator().hasNext()){
int temp=values.iterator().next().get();
minTemperature=Math.min(temp, minTemperature);
maxTemperature=Math.max(temp, maxTemperature);
}
Text outputValue=new Text(String.format("%s的最高温度是:%d, 最低温度是:%d", key, maxTemperature, minTemperature));
context.write(outputValue, null);
}
}
统计各地区每月的前三高温度
- 每个地区,每天的最高温度和最低温度分别是多少?
- 每个地区,每个月最高的三个温度以及它对应的是几号
- [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-J05nV9Wn-1631968670332)(C:/Users/Administrator/Desktop/bigData文档/005_Hadoop-MapReduce 30节/Hadoop-MapReduce.assets/image-20201106001412832.png)]
代码实现
定义Weather类
import cn.hutool.core.date.DateUtil;
import com.google.common.base.Objects;
import lombok.*;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Date;
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class WeatherWritable03 implements WritableComparable<WeatherWritable03> {
private String province;
private String city;
private String adcode;
private Date date;
private int temperature;
private String winddirection;
private String windpower;
private String weather;
private String humidity;
@Override
public int compareTo(WeatherWritable03 o) {
int result = this.province.compareTo(o.province);
if (result == 0) {
result = this.city.compareTo(o.city);
if (result == 0) {
result = this.adcode.compareTo(o.adcode);
if (result == 0) {
result = DateUtil.format(this.date, "yyyy-MM").
compareTo(DateUtil.format(o.date, "yyyy-MM"));
if (result == 0) {
result = o.temperature - this.temperature;
}
}
}
}
return result;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
WeatherWritable03 that = (WeatherWritable03) o;
return temperature == that.temperature && Objects.equal(province, that.province)
&& Objects.equal(city, that.city) && Objects.equal(adcode, that.adcode)
&& Objects.equal(date, that.date) && Objects.equal(winddirection, that.winddirection)
&& Objects.equal(windpower, that.windpower) && Objects.equal(weather, that.weather)
&& Objects.equal(humidity, that.humidity);
}
@Override
public int hashCode() {
return Objects.hashCode(province, city, adcode, date, temperature, winddirection,
windpower, weather, humidity);
}
@Override
public String toString() {
return "WeatherWritable{" +
"province='" + province + '\'' +
", city='" + city + '\'' +
", adcode='" + adcode + '\'' +
", date='" + date + '\'' +
", temperature=" + temperature +
", winddirection='" + winddirection + '\'' +
", windpower='" + windpower + '\'' +
", weather='" + weather + '\'' +
", humidity='" + humidity + '\'' +
'}';
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(this.province);
out.writeUTF(this.city);
out.writeUTF(this.adcode);
out.writeLong(this.date.getTime());
out.writeInt(this.temperature);
out.writeUTF(this.weather);
out.writeUTF(this.windpower);
out.writeUTF(this.winddirection);
out.writeUTF(this.humidity);
}
@Override
public void readFields(DataInput in) throws IOException {
this.province = in.readUTF();
this.city = in.readUTF();
this.adcode = in.readUTF();
this.date = new Date(in.readLong());
this.temperature = in.readInt();
this.weather = in.readUTF();
this.windpower = in.readUTF();
this.winddirection = in.readUTF();
this.humidity = in.readUTF();
}
}
定义WeatherGroupingComparator分组比较器
import cn.hutool.core.date.DateUtil;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class WeatherGroupingComparator extends WritableComparator {
public WeatherGroupingComparator() {
super(WeatherWritable03.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
WeatherWritable03 w1 = (WeatherWritable03) a;
WeatherWritable03 w2 = (WeatherWritable03) b;
int result = w1.getProvince().compareTo(w2.getProvince());
if (result == 0) {
result = w1.getCity().compareTo(w2.getCity());
if (result == 0) {
result = DateUtil.format(w1.getDate(), "yyyy-MM")
.compareTo(DateUtil.format(w2.getDate(), "yyyy-MM"));
}
}
return result;
}
}
定义分区
import org.apache.hadoop.mapreduce.Partitioner;
public class WeatherWritablePartitioner extends Partitioner {
@Override
public int getPartition(Object o, Object o2, int numPartitions) {
return 0;
}
}
定义Job类
import cn.hutool.core.date.DateUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.util.Date;
public class WeatherCounterJob03 {
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration(true);
configuration.set("mapreduce.framework.name", "local");
String dateStr = DateUtil.format(new Date(), "yyyyMMddHHmmss");
String jobName = String.format("WeatherCounter03-%s", dateStr);
Job weatherCounterJob = Job.getInstance(configuration, jobName);
weatherCounterJob.setJarByClass(WeatherCounterJob03.class);
weatherCounterJob.setMapOutputKeyClass(WeatherWritable03.class);
weatherCounterJob.setMapOutputValueClass(IntWritable.class);
weatherCounterJob.setGroupingComparatorClass(WeatherGroupingComparator.class);
FileInputFormat.setInputPaths(weatherCounterJob, "/yjxxt/data/area_weather.csv");
FileOutputFormat.setOutputPath(weatherCounterJob, new Path(String.format("/yjxxt/result/%s", jobName)));
weatherCounterJob.setMapperClass(WeatherCounterMapper03.class);
weatherCounterJob.setReducerClass(WeatherCounterReducer03.class);
weatherCounterJob.waitForCompletion(true);
}
}
定义Mapper类
import cn.hutool.core.date.DateUtil;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
import java.util.Date;
public class WeatherCounterMapper03 extends Mapper<LongWritable, Text, WeatherWritable03, IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
if (key.get() == 0) {
return;
}
String[] recode = value.toString().replaceAll("\"", "").split(",");
String outPutKey = String.format("%s%s%s:%s", recode[1],
recode[2], recode[3], recode[9].split(" ")[0]);
int temperature = Integer.parseInt(recode[5]);
Date createTime = DateUtil.parse(recode[9], "dd/MM/yyyy HH:mm:ss");
String yearAndMonthDateStr = DateUtil.format(DateUtil.date(DateUtil.calendar(createTime)), "yyyy-MM-dd");
Date yearAndMonthDate = DateUtil.parse(yearAndMonthDateStr, "yyyy-MM-dd");
WeatherWritable03 weatherWritable = WeatherWritable03.builder().weather(recode[4])
.adcode(recode[3]).city(recode[2])
.date(yearAndMonthDate)
.humidity(recode[8]).province(recode[1])
.temperature(temperature).winddirection(recode[6]).windpower(recode[7])
.build();
context.write(weatherWritable, new IntWritable(temperature));
}
}
定义Reducer类
import cn.hutool.core.date.DateUtil;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
public class WeatherCounterReducer03 extends Reducer<WeatherWritable03, IntWritable, Text, NullWritable> {
@Override
protected void reduce(WeatherWritable03 key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
Set<String> set = new HashSet<>();
Iterator<IntWritable> iterator = values.iterator();
while (iterator.hasNext()) {
StringBuffer buffer = new StringBuffer();
buffer.append(key.getProvince()).append("|");
buffer.append(key.getCity()).append("|");
buffer.append(DateUtil.format(key.getDate(), "yyyy-MM-dd")).append("|");
buffer.append(iterator.next().get());
set.add(buffer.toString());
if (set.size() == 3) {
break;
}
}
for (String s : set) {
context.write(new Text(s), null);
}
}
}
好友推荐系统
- 数据量
- 解决思路
- 需要按照行进行计算
- 将相同推荐设置成相同的key,便于reduce统一处理
- 数据
tom hello hadoop cat
world hello hadoop hive
cat tom hive
mr hive hello
hive cat hadoop world hello mr
hadoop tom hive world hello
hello tom world hive mr
好友推荐的代码实现
分析:
例如张家辉的好友列表:张家辉 王海泉 钟添添 张雨 彭玉丹 谢丽萍 肖娴 夏新新 程琦慧
---------------------------------------------------------------------------------------------------
将这一行看作String数组,[张家辉,王海泉,钟添添,张雨,彭玉丹,谢丽萍,肖娴,夏新新,程琦慧] names[0]表示当前用户,names[i]表示他的好友;用数值表示推荐值
直接好友:记直接好友为 0推荐值
张家辉 王海泉 0 张家辉 钟添添 0 张家辉 张雨 0 ...
间接好友:记间接好友为1推荐值
王海泉钟添添 1 王海泉 张雨 1 王海泉 彭玉丹 1 王海泉 谢丽萍 1 王海泉 肖娴 1 王海泉 夏新新 1 王海泉 程琦慧 1
定义一个Friend类
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.lib.db.DBWritable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.Date;
import java.util.Objects;
public class Friend implements Writable, DBWritable{
private String id;
private String person;
private String friend;
private Integer count;
private Date createtime;
public Friend() {
}
public Friend(String id, String person, String friend, Integer count, Date createtime) {
this.id = id;
this.person = person;
this.friend = friend;
this.count = count;
this.createtime = createtime;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getPerson() {
return person;
}
public void setPerson(String person) {
this.person = person;
}
public String getFriend() {
return friend;
}
public void setFriend(String friend) {
this.friend = friend;
}
public Integer getCount() {
return count;
}
public void setCount(Integer count) {
this.count = count;
}
public Date getCreatetime() {
return createtime;
}
public void setCreatetime(Date createtime) {
this.createtime = createtime;
}
@Override
public String toString() {
return "Friend{" +
"id='" + id + '\'' +
", person='" + person + '\'' +
", friend='" + friend + '\'' +
", count=" + count +
", createtime=" + createtime +
'}';
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Friend friend1 = (Friend) o;
return Objects.equals(id, friend1.id) &&
Objects.equals(person, friend1.person) &&
Objects.equals(friend, friend1.friend) &&
Objects.equals(count, friend1.count) &&
Objects.equals(createtime, friend1.createtime);
}
@Override
public int hashCode() {
return Objects.hash(id, person, friend, count, createtime);
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(this.id);
dataOutput.writeUTF(this.person);
dataOutput.writeUTF(this.friend);
dataOutput.writeInt(this.count);
dataOutput.writeLong(this.createtime.getTime());
}
@Override
public void readFields(DataInput dataInput) throws IOException {
this.id = dataInput.readUTF();
this.person = dataInput.readUTF();
this.friend = dataInput.readUTF();
this.count = dataInput.readInt();
this.createtime = new Date(dataInput.readLong());
}
@Override
public void write(PreparedStatement preparedStatement) throws SQLException {
preparedStatement.setString(1, this.id);
preparedStatement.setString(2, this.person);
preparedStatement.setString(3, this.friend);
preparedStatement.setInt(4, this.count);
preparedStatement.setTimestamp(5, new Timestamp(this.createtime.getTime()));
}
@Override
public void readFields(ResultSet resultSet) throws SQLException {
this.id = resultSet.getString(1);
this.person = resultSet.getString(2);
this.friend = resultSet.getString(3);
this.count = resultSet.getInt(4);
this.createtime = resultSet.getTimestamp(5);
}
}
使用reservoir Sampling蓄水池算法随机生成好友
import org.apache.commons.io.FileUtils;
import java.io.File;
import java.io.IOException;
import java.util.*;
import java.util.stream.Collectors;
public class FriendRandomUtil {
public static void main(String[] args) throws IOException {
List<String> studentList = FileUtils.readLines(new File(FriendRandomUtil.class.getResource("/students.txt").getPath()));
Map<String, Set<String>> friendMap = studentList.stream().collect(Collectors.toMap(e -> e, e -> new HashSet<>()));
for (String student : friendMap.keySet()) {
List<String> sampleList = FriendRandomUtil.reservoirSampling(studentList, new Random().nextInt(30) + 10);
friendMap.get(student).addAll(sampleList);
for (String friend : sampleList) {
friendMap.get(friend).add(student);
}
}
for (String student : friendMap.keySet()) {
System.out.print(student + "\t");
friendMap.get(student).stream().forEach(e -> System.out.print(e + "\t"));
System.out.println();
}
}
public static List<String> reservoirSampling(List<String> studentList, int num) {
List<String> sampleList = studentList.subList(0, num);
for (int i = num; i < studentList.size(); i++) {
int r = new Random().nextInt(i);
if (r < num) {
sampleList.set(r, studentList.get(i));
}
}
return sampleList;
}
}
定义FriendJob任务类
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class FriendJob {
private static String driverClass = "com.mysql.cj.jdbc.Driver";
private static String url = "jdbc:mysql://192.168.191.101:3306/friend?useUnicode=true&characterEncoding=utf8&serverTimezone=GMT%2B8";
private static String username = "root";
private static String password = "123456";
private static String tableName = "t_friends";
private static String[] fields = {"id", "person", "friend", "count", "createtime"};
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration configuration = new Configuration(true);
configuration.set("mapreduce.framework.name", "local");
DBConfiguration.configureDB(configuration, driverClass, url
, username, password);
Job job = Job.getInstance(configuration);
job.setJarByClass(Hello01FriendJob.class);
job.setJobName("Hello01Friend" + System.currentTimeMillis());
job.setNumReduceTasks(2);
FileInputFormat.setInputPaths(job,new Path("/yjxxt/friends.txt"));
FileOutputFormat.setOutputPath(job, new Path("/yjxxt/result/friend_"+System.currentTimeMillis()));
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setMapperClass(FriendMap.class);
job.setReducerClass(FriendReducer.class);
job.waitForCompletion(true);
}
}
定义一个FriendMap类
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class FriendMap extends Mapper<LongWritable, Text, Text, IntWritable> {
private static IntWritable one = new IntWritable(1);
private static IntWritable zero = new IntWritable(0);
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] names = value.toString().split("\t");
for (int i = 1; i < names.length; i++) {
context.write(new Text(namesSort(names[0], names[i])), zero);
}
for (int i = 1; i < names.length; i++) {
for (int j = i + 1; j < names.length; j++) {
context.write(new Text(namesSort(names[i], names[j])), one);
}
}
}
private String namesSort(String name1, String name2) {
return name1.compareTo(name2) > 0 ? name2 + "-" + name1 : name1 + "-" + name2;
}
}
定义一个FriendReducer类
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.Date;
import java.util.Iterator;
import java.util.UUID;
public class FriendReducer extends Reducer<Text, IntWritable, Friend, NullWritable> {
private String jobName;
protected void setup(Context context) throws IOException, InterruptedException {
jobName = context.getJobName();
}
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int count = 0;
Iterator<IntWritable> iterator = values.iterator();
while (iterator.hasNext()) {
int value = iterator.next().get();
if (value == 0) {
return;
}
count += value;
}
String[] names = key.toString().split("-");
Friend f1 = new Friend(jobName + UUID.randomUUID().toString().substring(0, 9), names[0], names[1], count, new Date());
Friend f2 = new Friend(jobName + UUID.randomUUID().toString().substring(0, 9), names[1], names[0], count, new Date());
context.write(f1, NullWritable.get());
context.write(f2, NullWritable.get());
}
}
PageRank
基本概念
PageRank是Sergey Brin与Larry Page于1998年在WWW7会议上提出来的**,用来解决链接分析中网页排名的问题**
- PageRank是Google提出的算法,用于衡量特定网页相对于搜索引擎索引中的其他网页而言的重要程度
- PageRank实现了将链接价值概念作为排名因素
- 以后搜索对应关键词的时候,按照网站的权重顺序显示网页
-
方法的原理
算法过程
- 首先每个网站默认的权重是一样的(十分制,百分制)
- 然后将网站的权重平分给当前网站的所有出链( 10 / 5 = 2 )
- 如果一个网站有多个入链,就将本次所有的入链得分累加到一起(2+4+7+1+10=24分)
- 那么本次的得分会用于计算下次出链的计算 (24/5 = 4.8)
- 重复迭代上面的过程,慢慢达到一个收敛值
- 收敛标准是衡量本次计算精度的有效方法
- 超过99.9%的网站pr值和上次一样
- 所有的pr差值(本次和上次)累加求平均值不超过 0.0001
- 停止运算
阻尼系数
算法缺点
- 第一,没有区分站内导航链接。很多网站的首页都有很多对站内其他页面的链接,称为站内导航链接。这些链接与不同网站之间的链接相比,肯定是后者更能体现PageRank值的传递关系。
- 第二,没有过滤广告链接和功能链接。这些链接通常没有什么实际价值,前者链接到广告页面,后者常常链接到某个社交网站首页。
- 第三,对新网页不友好。一个新网页的一般入链相对较少,即使它的内容的质量很高,要成为一个高PR值的页面仍需要很长时间的推广。
- 针对PageRank算法的缺点,有人提出了TrustRank算法。其最初来自于2004年斯坦福大学和雅虎的一项联合研究,用来检测垃圾网站。TrustRank算法的工作原理:先人工去识别高质量的页面(即“种子”页面),那么由“种子”页面指向的页面也可能是高质量页面,即其TR值也高,与“种子”页面的链接越远,页面的TR值越低。“种子”页面可选出链数较多的网页,也可选PR值较高的网站
- TrustRank算法给出每个网页的TR值。将PR值与TR值结合起来,可以更准确地判断网页的重要性。
数据列举
执行流程分析:
a 1 b d
b 1 c
c 1 a b
d 1 b c
开始进行拆分
--------> Map
a 1 b d
b 0.5
d 0.5
b 1 c
c 1
c 1 a b
a 0.5
b 0.5
d 1 b c
b 0.5
c 0.5
--------->Reduce第一次规约结果
a 0.5 b d
b 1.5 c
c 1.5 a b
d 0.5 b c
---------------->Map第二次.....
分析
1. 算法过程分析
- 首先每个网站默认的权重是一样的(十分制,百分制)假设默认初始值1.0
- 然后将网站的权重平分给当前网站的所有出链( 10 / 5 = 2 )
- 如果一个网站有多个入链,就将本次所有的入链得分累加到一起(2+4+7+1+10=24分)
- 那么本次的得分会用于计算下次出链的计算 (24/5 = 4.8)
- 重复迭代上面的过程,慢慢达到一个收敛值
2. 逻辑分析
假设默认情况下,每个网站有1分的分值这些分值默认平分给所有的出链,等所有的网站将分数平分之后,开始计算本网站所有的入链分值;入链分值之和就是下次的网站的分数。
- 貔貅的问题 有些网站没有出链导致数据每次计算都在减少
-
友情链接 防止自己的分数浪费,把 分值户给对方。
解决方案:
-
添加阻尼系数来防止上面的问题 d = 0.85进行计算的最终结果会慢慢趋近与一个 收敛标准1.99% -
所有的网站的差值不超过 0.0001
3. 加入阻尼系数后如何修正Pr值
修正PageRank计算公式:
- d:阻尼系数—>常量值0.85
- M(i):指向i的页面集合---->对应入链页面集合
- L(j):页面的出链数---->对应出链页面集合–>代码中String[] adjacentNodeNames 出链的数组
- PR(pj):j页面的PR值–>每一轮的PR值经过计算都是确定的数值
- n:所有页面数—>所有的页面
4. 思路分析:
定义一个PageNode类,用于封装页面 PR和对应的出链地址;定义一个方法,根据传入的 pr值 + 出链字符串,将其转成Node对象。确定Map的key和value;判断是否为第一次加载,将数据封装成一个对象{pr,子连接};传递老的pr值和对应的页面关系
例如 a 0.75 b d 对应的出链和pr值
key–> a value–> 0.75 b d ;明确了key和value后开始Map阶段拆分,判断节点对象是否含有出链,如果有就计算。开始计算每个节点本次应得的pr值;每个节点的pr=当前页面节点pr/出链的数量。Reducer阶段根据上面的公式进行计算,规约求出最终结果。在Job里判断是否达到收敛值,达到收敛标准就退出死循环。
体代码实现
定义一个枚举用于保存计算的收敛值
public enum MyCounter {
CONVERGENCESUM
}
定义一个PageNode类,用于封装页面 PR和对应的出链地址
import org.apache.commons.lang.StringUtils;
import java.io.IOException;
import java.util.Arrays;
public class PageNode {
private double pageRank = 1.0;
private String[] adjacentNodeNames;
public static final char fieldSeparator = '\t';
public boolean containsAdjacentNodes() {
return adjacentNodeNames != null && adjacentNodeNames.length > 0;
}
public static PageNode fromMR(String value) throws IOException {
String[] parts = StringUtils.splitPreserveAllTokens(value, fieldSeparator);
if (parts.length < 1) {
throw new IOException("Expected 1 or more parts but received " + parts.length);
}
PageNode node = new PageNode().setPageRank(Double.valueOf(parts[0]));
if (parts.length > 1) {
node.setAdjacentNodeNames(Arrays.copyOfRange(parts, 1, parts.length));
}
return node;
}
public static PageNode fromMR(String v1, String v2) throws IOException {
return fromMR(v1 + fieldSeparator + v2);
}
public double getPageRank() {
return pageRank;
}
public PageNode setPageRank(double pageRank) {
this.pageRank = pageRank;
return this;
}
public String[] getAdjacentNodeNames() {
return adjacentNodeNames;
}
public PageNode setAdjacentNodeNames(String[] adjacentNodeNames) {
this.adjacentNodeNames = adjacentNodeNames;
return this;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(pageRank);
if (getAdjacentNodeNames() != null) {
sb.append(fieldSeparator).append(StringUtils.join(getAdjacentNodeNames(), fieldSeparator));
}
return sb.toString();
}
}
定义PageRankJob类
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class PageRankJob {
private static double convergence = 0.0001;
public static void main(String[] args) {
Configuration configuration = new Configuration(true);
configuration.set("mapreduce.app-submission.cross-platform", "true");
configuration.set("mapreduce.framework.name", "local");
int runCount = 0;
while (true) {
runCount++;
try {
configuration.setInt("runCount", runCount);
FileSystem fs = FileSystem.get(configuration);
Job job = Job.getInstance(configuration);
job.setJarByClass(PageRankJob.class);
job.setJobName("pagerank-" + runCount);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setMapperClass(PageRankMapper.class);
job.setReducerClass(PageRankReducer.class);
job.setInputFormatClass(KeyValueTextInputFormat.class);
Path inputPath = new Path("/yjxxt/pagerank/input/");
if (runCount > 1) {
inputPath = new Path("/yjxxt/pagerank/output/pr" + (runCount - 1));
}
FileInputFormat.addInputPath(job, inputPath);
Path outpath = new Path("/yjxxt/pagerank/output/pr" + runCount);
if (fs.exists(outpath)) {
fs.delete(outpath, true);
}
FileOutputFormat.setOutputPath(job, outpath);
boolean flag = job.waitForCompletion(true);
if (flag) {
System.out.println("--------------------------success." + runCount);
long sum = job.getCounters().findCounter(MyCounter.CONVERGENCESUM).getValue();
double avgConvergence = sum / 40000.0;
if (avgConvergence < convergence) {
break;
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
定义一个PageRankMapper类
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class PageRankMapper extends Mapper<Text, Text, Text, Text> {
protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
int runCount = context.getConfiguration().getInt("runCount", 1);
String page = key.toString();
PageNode node = null;
if (runCount == 1) {
node = PageNode.fromMR("1.0", value.toString());
} else {
node = PageNode.fromMR(value.toString());
}
context.write(new Text(page), new Text(node.toString()));
if (node.containsAdjacentNodes()) {
double outValue = node.getPageRank() / node.getAdjacentNodeNames().length;
for (int i = 0; i < node.getAdjacentNodeNames().length; i++) {
String outPage = node.getAdjacentNodeNames()[i];
context.write(new Text(outPage), new Text(outValue + ""));
}
}
}
}
定义一个PageRankReducer类
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class PageRankReducer extends Reducer<Text, Text, Text, Text> {
protected void reduce(Text key, Iterable<Text> iterable, Context context) throws IOException, InterruptedException {
double sum = 0.0;
PageNode sourceNode = null;
for (Text i : iterable) {
PageNode node = PageNode.fromMR(i.toString());
if (node.containsAdjacentNodes()) {
sourceNode = node;
} else {
sum = sum + node.getPageRank();
}
}
double newPR = (0.15 / 4.0) + (0.85 * sum);
System.out.println(key + "*********** new pageRank value is " + newPR);
double d = newPR - sourceNode.getPageRank();
int j = Math.abs((int) (d * 1000.0));
context.getCounter(MyCounter.CONVERGENCESUM).increment(j);
sourceNode.setPageRank(newPR);
context.write(key, new Text(sourceNode.toString()));
}
}
|