本人使用软件:IDEA、hadoop2.7.1、docker20.2.1、springboot2.7.1
1.使用docker启动下载
docker run --name=my-hadoop -it -p 50070:50070 -p 8088:8088 -p 50075:50075 -p 9000:9000 sequenceiq/hadoop-docker:2.7.1 /etc/bootstrap.sh -bash --privileged=true
2.通过jps查看启动情况?
3.hadoop端口详情查看
https://ambari.apache.org/1.2.5/installing-hadoop-using-ambari/content/reference_chap2_1.html
4.通过springboot连接hadoop进行文件创建,文本读写操作
4.1pom引用
<!--hadoop依赖-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>3.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>3.1.1</version>
</dependency>
4.2application.yml
#HDFS配置
hdfs:
path: hdfs://你的主机ip:9000
user: root
4.3Controller代码
package com.example.demo.hadoop;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.multipart.MultipartFile;
@RestController
@RequestMapping("/test/hdfs")
public class HdfsController {
@Autowired
private HdfsService service;
@GetMapping("/mkdir")
public String mkdir(String path){
try {
service.mkdir(path);
return "1";
} catch (Exception e) {
System.out.print(e.toString());
return "2";
}
}
// @PostMapping("/createFile")
// public Object createFile(String path, MultipartFile file){
// try {
// service.createFile(path, file);
// return 1;
// } catch (Exception e) {
// return e;
// }
// }
//
// @GetMapping("/readFileToString")
// public Object readFileToString(String path){
// try {
// return service.readFileToString(path);
// } catch (Exception e) {
// return e;
// }
// }
}
4.4service 代码
package com.example.demo.hadoop;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.web.multipart.MultipartFile;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Service
public class HdfsService {
@Value("${hdfs.path}")
private String hdfsPath;
@Value("${hdfs.user}")
private String user;
/**
* 获取hdfs配置信息
* @return
*/
private Configuration getConfiguration(){
Configuration configuration = new Configuration();
configuration.set("fs.defaultFS", hdfsPath);
return configuration;
}
/**
* 获取文件系统对象
* @return
*/
public FileSystem getFileSystem() throws Exception {
FileSystem fileSystem = FileSystem.get(new URI(hdfsPath), getConfiguration(), user);
return fileSystem;
}
/**
* 创建HDFS文件夹
* @param dir
* @return
* @throws Exception
*/
public boolean mkdir(String dir) throws Exception{
if(StringUtils.isBlank(dir)){
return false;
}
if(exist(dir)){
return true;
}
FileSystem fileSystem = getFileSystem();
boolean isOk = fileSystem.mkdirs(new Path(dir));
fileSystem.close();
return isOk;
}
/**
* 判断HDFS的文件是否存在
* @param path
* @return
* @throws Exception
*/
public boolean exist(String path) throws Exception {
if(StringUtils.isBlank(path)){
return false;
}
FileSystem fileSystem = getFileSystem();
return fileSystem.exists(new Path(path));
}
/**
* 读取路径下的文件信息
* @param path
* @return
* @throws Exception
*/
public List<Map<String,Object>> readPathInfo(String path) throws Exception {
if(!exist(path)){
return null;
}
FileSystem fs = getFileSystem();
FileStatus[] statuses = fs.listStatus(new Path(path));
if(statuses == null || statuses.length < 1){
return null;
}
List<Map<String,Object>> list = new ArrayList<>();
for(FileStatus fileStatus : statuses){
Map<String,Object> map = new HashMap<>();
map.put("filePath", fileStatus.getPath());
map.put("fileStatus", fileStatus.toString());
list.add(map);
}
return list;
}
/**
* HDFS创建文件
* @param path
* @param file
* @throws Exception
*/
public void createFile(String path, MultipartFile file) throws Exception {
if(StringUtils.isBlank(path) || null == file){
return;
}
FileSystem fs = getFileSystem();
String fileName = file.getOriginalFilename();//文件名
Path filePath = new Path(path + "/" + fileName);
FSDataOutputStream outputStream = fs.create(filePath);
outputStream.write(file.getBytes());
outputStream.close();
fs.close();
}
/**
* 读取HDFS文件内容
* @param path
* @return
* @throws Exception
*/
public String readFileToString(String path) throws Exception{
if(!exist(path)){
return null;
}
FileSystem fs = getFileSystem();
FSDataInputStream inputStream = null;
try {
inputStream = fs.open(new Path(path));
BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
StringBuffer sb = new StringBuffer();
String line = "";
while ((line = reader.readLine()) != null){
sb.append(line);
}
return sb.toString();
}finally {
if(inputStream != null){
inputStream.close();
}
fs.close();
}
}
/**
* 获取目录下的文件列表
* @param path
* @return
* @throws Exception
*/
public List<Map<String,Object>> listFiles(String path) throws Exception {
if(!exist(path)){
return null;
}
FileSystem fs = getFileSystem();
RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(new Path(path), true);
List<Map<String,Object>> list = new ArrayList<>();
while (iterator.hasNext()){
LocatedFileStatus fileStatus = iterator.next();
Map<String,Object> map = new HashMap<>();
map.put("filePath", fileStatus.getPath().toString());
map.put("fileName", fileStatus.getPath().getName());
list.add(map);
}
return list;
}
/**
* 重命名HDFS文件
* @param oldName
* @param newName
* @return
* @throws Exception
*/
public boolean renameFile(String oldName, String newName)throws Exception{
if(!exist(oldName) || StringUtils.isBlank(newName)){
return false;
}
FileSystem fs = getFileSystem();
boolean isOk = fs.rename(new Path(oldName), new Path(newName));
fs.close();
return isOk;
}
/**
* 删除HDFS文件
* @param path
* @return
* @throws Exception
*/
public boolean deleteFile(String path)throws Exception {
if(!exist(path)){
return false;
}
FileSystem fs = getFileSystem();
boolean isOk = fs.deleteOnExit(new Path(path));
fs.close();
return isOk;
}
/**
* 上传文件到HDFS
* @param path
* @param uploadPath
* @throws Exception
*/
public void uploadFile(String path,String uploadPath) throws Exception{
if(StringUtils.isBlank(path) || StringUtils.isBlank(uploadPath)){
return;
}
FileSystem fs = getFileSystem();
fs.copyFromLocalFile(new Path(path), new Path(uploadPath));
fs.close();
}
/**
* 从HDFS下载文件
* @param path
* @param downloadPath
* @throws Exception
*/
public void downloadFile(String path, String downloadPath) throws Exception{
if(StringUtils.isBlank(path) || StringUtils.isBlank(downloadPath)){
return;
}
FileSystem fs = getFileSystem();
fs.copyToLocalFile(new Path(path), new Path(downloadPath) );
fs.close();
}
/**
* 拷贝HDFS文件
* @param sourcePath
* @param targetPath
* @throws Exception
*/
public void copyFile(String sourcePath, String targetPath) throws Exception{
if(StringUtils.isBlank(sourcePath) || StringUtils.isBlank(targetPath)){
return;
}
FileSystem fs = getFileSystem();
FSDataInputStream inputStream = null;
FSDataOutputStream outputStream = null;
try{
inputStream = fs.open(new Path(sourcePath));
outputStream = fs.create(new Path(targetPath));
//todo IOUtils.copyBytes(inputStream, outputStream, , false);
}finally {
if(inputStream != null){
inputStream.close();
}
if(outputStream != null){
outputStream.close();
}
fs.close();
}
}
/**
* 读取HDFS文件并返回byte[]
* @param path
* @return
* @throws Exception
*/
// public byte[] readFileToBytes(String path) throws Exception{
// if(!exist(path)){
// return null;
// }
// FileSystem fs = getFileSystem();
// FSDataInputStream inputStream = null;
// try {
// inputStream = fs.open(new Path(path));
// return IOUtils.readFullyToByteArray(inputStream);
// }finally {
// if(inputStream != null){
// inputStream.close();
// }
// fs.close();
// }
// }
/**
* 获取文件块在集群的位置
* @param path
* @return
* @throws Exception
*/
public BlockLocation[] getFileBlockLocations(String path)throws Exception{
if(exist(path)){
return null;
}
FileSystem fs = getFileSystem();
FileStatus fileStatus = fs.getFileStatus(new Path(path));
return fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
}
}
5.运行成功,其他方法不一一演示
?注意一点:
客户端即IDEA连接hadoop?Connection refused情况,是9000端口存在问题,注意排查,具体地址
ConnectionRefused - HADOOP2 - Apache Software Foundation
|