前言:以下所有知识仅限于入门,对自己代码的一个记录,有不详细之处留到日后补录。
Hadoop
map的输入固定是LongWritable和Text,可理解为偏移量和String类型的数据。 核心:map的输出的key和value是reduce的输入的key和value
1、求和
主类
public static void main(String[] args) throws Exception{
// 1、初始化配置,告诉取得程序
Configuration configuration = new Configuration();
// 2、初始化任务
Job job = Job.getInstance(configuration);
job.setJarByClass(PriceSumMain.class);
// 3、开始配置map 所有的map都在做切割文件
job.setMapperClass(PriceSumMapper.class);
// 设置map阶段的输入和输出阶段
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
// 4、开始配置reduce
job.setReducerClass(PriceSumReduce.class);
// 设置reduce的输出
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
// 5、设置输入输出结果集
FileInputFormat.addInputPath(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
// 6、提交任务
System.exit(job.waitForCompletion(true)?0:1);
}
mapper
// 求价格总和
// 偏移量 数据 输出的key 输出的value
public class PriceSumMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 取得每一行数据
String line=value.toString();
// 对每一行数据进行切割
String[] items = line.split(",");
// 取得第一列的数据
int num = Integer.parseInt(items[1]);
context.write(new Text("num"), new LongWritable(num));
}
}
reduce
public class PriceSumReduce extends Reducer<Text, LongWritable, Text, LongWritable> {
protected void reduce(Text key, Iterable<LongWritable> values,Context context) throws IOException, InterruptedException {
long sum =0L;
for(LongWritable a:values){
sum +=a.get();
}
context.write(new Text("最后求出的数据:"), new LongWritable(sum));
}
}
2、排序
排序分为俩种,数字排序和字母排序。排序是对key的排序 根据数字排序,默认升序。
主类
public static void main(String[] args) throws Exception{
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
job.setJarByClass(SortSalaryMain.class);
// 根据数字排序
job.setMapperClass(SortSalaryByNumMapper.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(Text.class);
// 数字排序无需reduce
FileInputFormat.addInputPath(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
System.exit(job.waitForCompletion(true)?0:1);
}
mapper
// 员工薪资排序
public class SortSalaryByNumMapper extends Mapper<LongWritable, Text, IntWritable, Text> {
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 获取到每一行数据
String line = value.toString();
// 切割数据
String[] emps = line.split(",");
// 获取员工姓名
String name=emps[1];
// 获取员工薪资
int salary =Integer.parseInt(emps[5]);
context.write( new IntWritable(salary),new Text(name));
}
}
根据字母排序
主类
public static void main(String[] args) throws Exception{
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
job.setJarByClass(SortSalaryByCharMain.class);
job.setMapperClass(SortSalaryByNumMapper.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(Text.class);
// 配置其他 根据字母排序
job.setSortComparatorClass(SortSalaryComparator.class);
FileInputFormat.addInputPath(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
System.exit(job.waitForCompletion(true)?0:1);
}
排序
// WritableComparator
public class SortSalaryComparator extends WritableComparator {
//public class SortSalaryComparator implements RawComparator {
// 构造器调用父类方法,告诉Comparator(比较器) 需要比较的数据类型是什么
public SortSalaryComparator(){
super(IntWritable.class,true);
}
// 重写比较方法,默认升序,实现降序 自定义比较规则
@Override
public int compare(WritableComparable a, WritableComparable b) {
// 向下转型
IntWritable xa = (IntWritable) a;
IntWritable xb = (IntWritable) b;
// 也执行成功
return -super.compare(a, b);
// 可行
// return -xa.compareTo(xb);
}
}
3、去重
主类
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
job.setJarByClass(DistinctJobMain.class);
job.setMapperClass(DistinctJobMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
job.setReducerClass(DistinctJobReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.addInputPath(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
System.exit(job.waitForCompletion(true)?0:1);
}
mapper
public class DistinctJobMapper extends Mapper <LongWritable, Text, Text, NullWritable> {
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line=value.toString();
String[] items = line.split(",");
// 获取员工的职位
String job = items[2];
// 因为原语,所以将所有的key都设置为相同的
context.write(new Text(job), NullWritable.get());
}
}
reduce
public class DistinctJobReduce extends Reducer <Text, NullWritable, Text, NullWritable> {
protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
// 利用原语,相同的key为一组,调用一个或一次reduce,过滤重复的工作
context.write(key, NullWritable.get());
}
}
4、分区
需求:根据薪资进行分区 (0,1500,5000,10000,以上)
主类
// 根据员工薪资分区
public static void main(String[] args) throws Exception{
// 1、初始化配置,告诉取得程序
Configuration configuration = new Configuration();
// 2、初始化任务
Job job = Job.getInstance(configuration);
job.setJarByClass(PartitionBySalaryMain.class);
// 3、开始配置map 所有的map都在做切割文件
job.setMapperClass(PartitionMapper.class);
// 设置map阶段的输入和输出阶段
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 4、开始配置reduce
job.setReducerClass(PartitionReduce.class);
// 设置reduce的输出
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 设置分区
job.setPartitionerClass(EmpPartitionBySalary.class);
// 设置分区数量 按工资分区0,1500,5000,10000
job.setNumReduceTasks(4);
// 5、设置输入输出结果集
FileInputFormat.addInputPath(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
// 6、提交任务
System.exit(job.waitForCompletion(true)?0:1);
}
mapper
public class PartitionMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] items = line.split(",");
// 获取员工的薪资
int salary = Integer.parseInt(items[5]);
// 获取员工的姓名
String name = items[1];
context.write(new Text(name), new IntWritable(salary));
}
}
reduce
public class PartitionReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
for (IntWritable value:values){
context.write(key,value);
}
}
}
分区类
public class EmpPartitionBySalary extends Partitioner<Text, IntWritable> {
@Override
public int getPartition(Text name, IntWritable salary, int numPartition) {
// 根据薪资进行分区 (0,1500,5000,10000,以上)
int sal = salary.get();
if(sal<1500){
return 1% numPartition;
}else if(sal < 5000){
return 2%numPartition;
}else if(sal <10000){
return 3%numPartition;
}
return 4%numPartition;
}
}
5、合并
主类
/**
* 合并员工信息
*/
public class JoinInfoMain {
public static void main(String[] args) throws Exception{
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
job.setJarByClass(JoinInfoMain.class);
job.setMapperClass(JoinInfoMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setReducerClass(JoinInfoReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
System.exit(job.waitForCompletion(true)?0:1);
}
}
mapper
//在做多个文档联合数据分析的时候,一定要注意你join的点在哪地方
public class JoinInfoMapper extends Mapper<LongWritable, Text, Text, Text> {
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] items = line.split(",");
// 根据每一行的长度来判断是哪一个文档,根据不同的文档来取值
if(items.length==8){
String name = items[1];
String job = items[2];
context.write(new Text(job),new Text(name));
}else if(items.length==3){
String job = items[0];
String desc=items[2]; // 获取岗位的描述
context.write(new Text(job), new Text("--"+desc));
}
// 打包,去分析,现在输出的是什么,然后在分析reduce
// 合并的话,要知道那个字段相同,他们两个数据集的key相同,原语,去重,合并
// context.write(new Text(job),new Text(name)); 第一个数据集
// context.write(new Text(job), new Text("--"+desc)); 第二个数据集
}
}
reduce
public class JoinInfoReduce extends Reducer<Text, Text, Text, Text> {
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
// 创建一个变量,用于存储该岗位下的所有员工姓名
StringBuffer names = new StringBuffer();
// 岗位信息
String jobInfo = "";
for(Text value:values){
// 取得mapper传过来的所有数据
String msg = value.toString();
// 判断是否包含-- 是的话,则为岗位描述
boolean flag = msg.contains("--");
if(flag){
jobInfo=msg;
}else {
// 将所有的values组合起来
names.append(msg+"、");
}
}
// 输出join之后的数据集
context.write(new Text(jobInfo),new Text(names.toString()));
}
}
6、topN
待后期完善
导入依赖
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.1.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>3.1.2</version>
</dependency>
</dependencies>
使用
Hadoop节点的顺序:NameNode---DataNode---SecondaryNameNode
1、将文件从Windows本地传到Linux本地,使用winscp即可
2、将文件从Linux本地上传到hdfs上
hdfs dfs -put 本地文件 hdfs上的路径
3、执行
hadoop jar jar的路径 Java类的包名+主类名 数据集在hdfs的位置 hdfs的输出路径
例如:hadoop jar /usr/mydata/restuemp-1.0.0.jar com.mypartition.PartitionBySalaryMain /data/newemp.csv /out1754
可能会遇到的问题,解决方案
Hadoop离开安全模式
hadoop dfsadmin -safemode leave
Hive
1、常用命令
(-e)不进入hive,执行命令:hive -e "select id from student;"
(-f)执行脚本SQL语句:touch hivef.sql
执行文件中的SQL hive -f hivef.sql
执行文件中的SQL,并将结果写入到文件中
hive -f hivejs.sq > result-hive.txt
可通过hive查看hdfs文件
dfs -ls /
退出:quit;(直接退出)或者exit;(先隐型提交数据,在退出)
在hive中查看本地文件:
! ls /root/tools
在~目录下,输入 cat .hivehistory 可查看所有hive的历史命令
经典语句:row format delimited fields terminated by ‘\t’
2、数据操作
创建分区表
注意:分区字段不能是表中已经存在的数据,可以将分区字段看作表的伪列。
create table dept_partition(
deptno int, dname string, loc string
)
partitioned by (month string)
row format delimited fields terminated by '\t';
加载数据到分区表
注意:分区表加载数据时,必须指定分区
load data local inpath '/root/tools/dept.txt' into table dept_partition partition(month='20210722');
查询分区数据
select * from dept_partition where month='20210722';
多个分区联合查询
select * from dept_partition where month='20210721'
union
select * from dept_partition where month='20210722'
union
select * from dept_partition where month='20210723';
创建二级分区表
create table dept_partition2(
deptno int, dname string, loc string
)
partitioned by (month string, day string)
row format delimited fields terminated by '\t';
加载本地文件到hive表里面
load data local inpath '/root/emp.txt' into table emp;
加载HDFS文件到hive里面---加载数据覆盖本地已有数据
load data inpath '/dara/student.txt' overwrite into table student;
通过查询语句向表中插入数据
将结果文件写入本地 去掉local 则为写到hdfs路径下
insert overwrite local directory '/usr/local/distribute-result' ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' select * from emp distribute by deptno sort by idno desc;
import数据到指定hive表中
import table student2 partition(month='201709') from
'/user/hive/warehouse/export/student';
数据导出
insert导出 导出到hdfs 下没有local
insert overwrite local directory '/opt/module/datas/export/student1'
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' select * from student;
hadoop命令,从hdfs下载到本地 get的使用
hdfs dfs -get resource_path_name target_path_name
hive shell导出
hive -e 'select * from default.student;' > /data/student4.txt;
export 导出到hdfs上
export table default.student to '/data/student';
export和import主要用于两个Hadoop平台集群之间Hive表迁移。
3、表操作
创建表时,location指定加载数据路径
create external table if not exists student5(
id int, name string
)row format delimited fields terminated by '\t'
location '/student';
重命名表
ALTER TABLE table_name RENAME TO new_table_name
删除表
drop table table_name
清除表中数据(Truncate):Truncate只能删除管理表,不能删除外部表中数据
truncate table student;
增加单个分区
alter table dept_partition add partition(month='201706') ;
同时创建多个分区
alter table dept_partition add partition(month='201705') partition(month='201704');
删除分区 删除多个分区逗号隔开
alter table dept_partition drop partition (month='201704');
查看分区表有多少分区
show partitions dept_partition;
查看分区表结构
desc formatted dept_partition;
ADD是代表新增一字段,字段位置在所有列后面(partition列前),REPLACE则是表示替换表中所有字段。
添加字段
alter table dept_partition add columns(deptdesc string);
替换字段
alter table dept_partition replace columns(deptno string, dname
string, loc string);
更新列
alter table dept_partition change column deptdesc desc int;
sqoop
Sqoop是一款开源的工具,主要用于在Hadoop(Hive)与传统的数据库(mysql、postgresql…)间进行数据的传递,可以将一个关系型数据库(例如 : MySQL ,Oracle ,Postgres等)中的数据导进到Hadoop的HDFS中,也可以将HDFS的数据导进到关系型数据库中。
从hive写到MySQL 即从hdfs-->mysql 我的文件名是course
sqoop export \
--connect jdbc:mysql://192.168.233.133:3306/company \
--username root \
--password Su#2021Gjj \
--table deptsumsalary \
--num-mappers 1 \
--export-dir /output/emp/course \
--input-fields-terminated-by ","
MySQL到hdfs
sqoop import \
--connect jdbc:mysql://192.168.233.133:3306/xyxy \
--username root \
--password Su#2021Gjj \
--table tbl_student \
--target-dir /output/tbl_student \
--delete-target-dir \
--num-mappers 1 \
--fields-terminated-by ","
mysql到hive 默认写到default数据库 其他数据的话,前面加数据库名.
sqoop import \
--connect jdbc:mysql://192.168.233.133:3306/xyxy \
--username root \
--password Su#2021Gjj \
--table tbl_student \
--num-mappers 1 \
--hive-import \
--fields-terminated-by "," \
--hive-overwrite \
--hive-table tbl_student_hive
逗号和制表符都是同样的效果
scala
spark也是基于scala开发的
spark
sparkSql
import java.text.SimpleDateFormat
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
case class userview(userid:String,productid:String,ptypeid:String,beh:String,time:String)
object SparkDemo {
def main(args: Array[String]): Unit = {
val appName = "demo"
val master = "local[*]"
val conf = new SparkConf().setAppName(appName).setMaster(master)
val spark = SparkSession.builder().config(conf).getOrCreate()
val sc = spark.sparkContext
val data = sc.textFile("E:\\java\\workplace2018\\studyknowlege\\sparkdemo\\src\\main\\resources\\UserBehavior.csv")
import spark.implicits._
val userDF = data.map{
line =>
val array = line.split(",")
userview(array(0),array(1),array(2),array(3),array(4))
}.toDF()
userDF.createOrReplaceTempView("uv")
def formateDate(timestamp:String):String = {
val simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
simpleDateFormat.format(timestamp.toLong * 1000).split(" ")(0)
}
spark.udf.register("formateDate",formateDate _)
val datanew = spark.sql("select userid,productid,ptypeid,beh,formateDate(time) from uv where userid = 1").show()
val userbehDF = spark.sql("select beh,count(beh) from uv group by beh")
userbehDF.write.format("jdbc")
.option("url","jdbc:mysql://192.168.233.133:3306/company")
.option("dbtable","userbeh")
.option("user","root")
.option("password","Su#2021Gjj")
.mode("Overwrite")
.save()
sc.stop()
spark.stop()
}
}
sparkStream
package com.sparkStream
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object SparkStreamDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(10))
val lines = ssc.textFileStream("hdfs://192.168.233.133:9000/sparkStream/")
val wordCounts = lines.flatMap(_.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
附录
操作完数据之后,进行数据渲染
jQuery关键性代码
<script src="./static/jquery.min.js" charset="utf-8"></script>
$(function () {
$.get("./servletdemo",function(data,status){
var result = JSON.parse(data);
var x = new Array();
var y = new Array();
for(var i = 0; i< result.length; i++){
x.push(result[i].deptno);
y.push(result[i].sumsalary);
}
console.log(x);
console.log(y);
});
另一个案例
// 组成这样的形式{value: 60, name: '访问'}, 即 records =[{...},{},{}]
var records = Array();
for(var i=0;i<result.length;i++){
records.push({"value":result[i].counts,"name":result[i].beh})
}
console.log("新的data记录集:",records)
echarts关键代码
可参考菜鸟教程地址:学习echarts
<div id="main1" style="width: 600px;height:400px;"></div>
<script src="./static/echarts.min.js" charset="utf-8"></script>
$(function () {
$.get("./servletdemo",function(data,status){
var result = JSON.parse(data);
var x = new Array();
var y = new Array();
for(var i = 0; i< result.length; i++){
x.push(result[i].deptno);
y.push(result[i].sumsalary);
}
console.log(x);
console.log(y);
// 基于准备好的dom,初始化echarts实例
var myChart1 = echarts.init(document.getElementById('main1'));
// 指定图表的配置项和数据
var option1 = {
title: {
text: '部门员工薪资总和'
},
tooltip: {},
legend: {
data:['薪资总和']
},
xAxis: {
data: x
},
yAxis: {},
series: [{
name: '销量',
type: 'bar',
data: y
}]
};
// 使用刚指定的配置项和数据显示图表。
myChart1.setOption(option1);
});
});
|