数据导入
1.project组数据
1.create database sqooptest1
2.use sqooptest1
3.create table project(
id int not null auto_increment primary key,
name varchar(100) not null,
type tinyint(4) not null default 0,
description varchar(500) default null,
create_at date default null,
update_at timestamp not null default current_timestamp on update current_timestamp,
status tinyint(4) not null default 0
);
4.insert into project( name,type,description,create_at,status)
values( 'project1',1,'project1 zy','2019-07-27',0);
insert into project( name,type,description,create_at,status)
values( 'project2',1,'project2 zy','2019-07-26',0);
insert into project( name,type,description,create_at,status)
values( 'project2',2,'project2 zy','2019-07-25',0);
sqoop import --connect jdbc:mysql://node3:3306/sqooptest1 --username root --password a --table project
- 结果
2.students组数据
1.create database sqooptest1
2.use sqooptest1
3.create table students(
id int not null primary key,
name varchar(100) not null,
age varchar(100) not null
);
E:\JAVA课程\...\11.Hadoop\12.Sqoop\a.txt
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStreamReader;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Scanner;
public class AddBatchMysql {
public static void main(String[] args) {
Scanner sc = new Scanner(System.in);
System.out.println("文件位置:");
String path = sc.nextLine();
List<String> list = new ArrayList<String>();
try (BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(new File(path))))){
String str;
while((str=br.readLine())!=null){
list.add(str);
}
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("数据总条数:"+list.size());
String sql = "insert into students values(?,?,?)";
Connection con=null;
PreparedStatement pstmt=null;
try {
con = DriverManager.getConnection("jdbc:mysql://node3:3306/sqooptest1?serverTimezone=UTC","root","a");
con.setAutoCommit(false);
pstmt = con.prepareStatement(sql);
int total = 0;
String s;
String[] ss;
for(int i=0;i<list.size();i++){
s = list.get(i);
ss = s.split("\t");
pstmt.setString(1, ss[0]);
pstmt.setString(2, ss[1]);
pstmt.setString(3, ss[2]);
pstmt.addBatch();
if((i+1)%1000==0){
int[] res = pstmt.executeBatch();
total+=sum(res);
con.commit();
pstmt.clearBatch();
}
}
int[] res = pstmt.executeBatch();
System.out.println(res);
total+=sum(res);
con.commit();
pstmt.clearBatch();
System.out.println("实际插入数据条数:"+total);
} catch (Exception e) {
e.printStackTrace();
try {
con.rollback();
} catch (SQLException e1) {
e1.printStackTrace();
}
}finally {
if(con!=null){
try {
con.setAutoCommit(true);
} catch (SQLException e) {
e.printStackTrace();
}
try {
con.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
}
private static int sum(int[] res){
int total = 0;
if(res==null&&res.length<=0){
return 0;
}
for(int i=0;i<res.length;i++){
total+=res[i];
}
return total;
}
}
sqoop-list-
1.列库
sqoop-list-databases --connect jdbc:mysql://localhost:3306/mysql?serverTimezone=UTC --username root --password a --verbose
--verbose:工作时打印更多信息
2.列表
sqoop-list-tables --connect jdbc:mysql://localhost:3306/mysql?serverTimezone=UTC --username root --password a --verbose
sqoop import-
1.指定路径:–target-dir
sqoop import --connect jdbc:mysql://node3:3306/sqooptest1?serverTimezone=UTC --username root --password a --table project --target-dir /sqooptest/input1/project
指定目录:/sqooptest/input1/project
实际目录:/sqooptest/input1/project
- 结果
2.表名当成数据仓库名:–warehouse-dir
sqoop import --connect jdbc:mysql://node3:3306/sqooptest1?serverTimezone=UTC --username root --password a --table project --warehouse-dir /sqooptest/input2
指定目录:/sqooptest/input2
实际目录:/sqooptest/input2/project
分析:指定目录下创建一个名字为表名的目录,此时这个表名就当成了一个数据仓库名(warehouse)
- 结果
3.指定要查询的列与查询条件
sqoop import --connect jdbc:mysql://node3:3306/sqooptest1?serverTimezone=UTC --username root --password a --table project --warehouse-dir /sqooptest/input3 --columns 'id,name,type' --where 'id>2' -m 1
--table
--columns
--where
-m:表示只用到一个mapper,一个mapper对应一个切片,对应一个输出文件
因为用了--table, 所以以上会自动地拼装sql 语句. , 不能与-e or -query合用
4.指定sql语句
sqoop import --connect jdbc:mysql://node3:3306/sqooptest1?serverTimezone=UTC --username root --password a --target-dir /sqooptest/input4/project --query 'select id,name,type from project where id>2 and $CONDITIONS' --split-by project.id -m 1
--query:不能与--table, --columns合用
$CONDITIONS:表明分区列
--split-by:用于拆分工作单元的表格列,不能与 --autoreset-to-one-mapper选项一起使用
-m:表示只用到一个mapper, 一个mapper对应一个切片,对应一个输出文件
5.–direct
sqoop import --connect jdbc:mysql://node3:3306/sqooptest1?serverTimezone=UTC --username root --password a --table project --warehouse-dir /sqooptest/input5 --direct -m 1
--direct使用mysqldump命令完成导入工作,因为是集群,map任务是分配到每个节点运行,所以每个节点都要有mysqldump命令
6.增量导入
--check-column:检查的列
--incremental append:如何确定哪些值是最新的
append:追加
lastmodified:最后一次修改
--last-value:上次导入检索的最大值
insert into project( name,type,description,create_at,status)
values( 'project5',5,'project5 zy','2019-07-25',0);
insert into project( name,type,description,create_at,status)
values( 'project6',5,'project5 zy','2019-07-25',0);
insert into project( name,type,description,create_at,status)
values( 'project7',5,'project5 zy','2019-07-25',0);
insert into project( name,type,description,create_at,status)
values( 'project8',5,'project5 zy','2019-07-25',0);
sqoop import --connect jdbc:mysql://node3:3306/sqooptest1?serverTimezone=UTC --username root --password a --table project --warehouse-dir /sqooptest/input6 -m 1 --check-column id --incremental append --last-value 3
- 结果
insert into project( name,type,description,create_at,status)
values( 'project9',5,'project5 zy','2019-07-25',0);
insert into project( name,type,description,create_at,status)
values( 'project10',5,'project5 zy','2019-07-25',0);
insert into project( name,type,description,create_at,status)
values( 'project11',5,'project5 zy','2019-07-25',0);
insert into project( name,type,description,create_at,status)
values( 'project12',5,'project5 zy','2019-07-25',0);
sqoop import --connect jdbc:mysql://node3:3306/sqooptest1?serverTimezone=UTC --username root --password a --table project --warehouse-dir /sqooptest/input6 -m 1 --check-column id --incremental append --last-value 7
- 结果:图中的输出目录正确为input6
sqoop import --connect jdbc:mysql://node3:3306/sqooptest1?serverTimezone=UTC --username root --password a --table project --warehouse-dir /sqooptest/input8 -m 1 --check-column update_at --incremental lastmodified --last-value "2022-06-29 15:46:12" --append
--append:将数据追加到 HDFS 中的现有数据集
- 结果
sqoop job-
1.语法格式
sqoop job (generic-args) (job-args) [-- [subtool-name] (subtool-args)]
注意--后有空格
2.创建任务,导入sqooptest1库中project表所以内容到hadoop
sqoop import --connect jdbc:mysql://node3:3306/sqooptest1?serverTimezone=UTC --username root --password a --table project --target-dir /sqooptest/input9/project -m 1
sqoop job --create yc-job1 -- import --connect jdbc:mysql://node3:3306/sqooptest1?serverTimezone=UTC --username root --password a --table project --target-dir /sqooptest/input10/project -m 1
1.任务已经存在了,请更改任务名或删除掉原任务.
2.Caused by: java.lang.ClassNotFoundException: org.json.JSONObject
缺少jar包(org.json.json),将java-json.jar包上传到sqoop/lib包下
sqoop job --list
sqoop job --show yc-job1
sqoop job --exec yc-job1
- 查看执行结果
3.使用密码文件登录数据库
- 以上创建任务时,提示MySQL的密码输入, 阻塞了自动化运行
- 官方7.2.1提示配置密码文件
- 创建密码隐藏文件
echo -n "a" >/root/.mysql.password
chmod 400 /root/.mysql.password
ls -al
sqoop job --create yc-job2 -- import --connect jdbc:mysql://node3:3306/sqooptest1?serverTimezone=UTC --username root --password-file file:/
sqoop job --list
sqoop job --show yc-job2
sqoop job --exec yc-job2
- 查看执行结果
4.创建追加导入任务
- 到mysql中查看一下project表的id最大值
- 插入一些新数据
insert into project( name,type,description,create_at,status)
values( 'project12',5,'project5 zy','2019-07-25',0);
insert into project( name,type,description,create_at,status)
values( 'project13',5,'project5 zy','2019-07-25',0);
insert into project( name,type,description,create_at,status)
values( 'project14',5,'project5 zy','2019-07-25',0);
insert into project( name,type,description,create_at,status)
values( 'project15',5,'project5 zy','2019-07-25',0);
sqoop job --create yc-job3 -- import --connect jdbc:mysql://node3:3306/sqooptest1?serverTimezone=UTC --username root --password-file file:/
sqoop job --list
sqoop job --exec yc-job3
- 查看执行结果
- 再插入一些新数据
insert into project( name,type,description,create_at,status)
values( 'project16',5,'project5 zy','2019-07-25',0);
insert into project( name,type,description,create_at,status)
values( 'project17',5,'project5 zy','2019-07-25',0);
sqoop job --exec yc-job3
- 查看执行结果
- ** 输出结果中表明,这个job底层有一个叫metastore的元数据库(sqlite, metastore)存储当前 id 的最新值 ,以便下一次从此处导入,这方便了定时任务,不用程序员自己记录更新到那一条数据了**
5.定时作业
1.oozie,azkaban框架***
2.编写定时程序(Thread类,java.util.TimerTask类,Quartz定时器框架->cron表达式)
3.centos自带的crontab实现****
- 第三种方案的实现
- /usr/local/bin下创建sqoop_incremental.sh定时任务脚本文件
cd /usr/local/bin
vim sqoop_incremental.sh
#! /bin/bash
/usr/local/sqoop147/bin/sqoop job --exec yc-job3>>/usr/local/sqoop147/myjob.out 2>&1 &
#解释
#/usr/local/sqoop147/bin/sqoop:sqoop命令全路径,防止找不到
#/usr/local/sqoop147/myjob.out:命令的结果输出到myjob.out
#2>&1:错误日志也当成正确日志
#&:后台进程
crontab -e
#每5分钟执行一次
*/5 * * * * /usr/bin/bash /usr/local/sqoop147/bin/sqoop_incremental.sh
#格式:分 时 日 月 周 命令
insert into project( name,type,description,create_at,status)
values( 'project18',5,'project5 zy','2019-07-25',0);
- 等待五分钟左右
- 查看日志文件:/usr/local/sqoop147/myjob.out
- 查看执行结果
|