HDFS 自定义实现函数将文件追加到末尾的问题:
一、实验环境:
- Ubuntu16.04
- Hadoop2.7.1 伪分布式(
只有一个DN ) - Eclipse
二、解决方案
Java代码:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import java.io.*;
public class HDFSApi {
public static boolean test(Configuration conf, String path) throws IOException {
FileSystem fs = FileSystem.get(conf);
return fs.exists(new Path(path));
}
public static void copyFromLocalFile(Configuration conf, String localFilePath, String remoteFilePath) throws IOException {
FileSystem fs = FileSystem.get(conf);
Path localPath = new Path(localFilePath);
Path remotePath = new Path(remoteFilePath);
fs.copyFromLocalFile(false, true, localPath, remotePath);
fs.close();
}
public static void appendToFile(Configuration conf, String localFilePath, String remoteFilePath) throws IOException {
FileSystem fs = FileSystem.get(conf);
Path remotePath = new Path(remoteFilePath);
FileInputStream in = new FileInputStream(localFilePath);
FSDataOutputStream out = fs.append(remotePath);
byte[] data = new byte[1024];
int read = -1;
while ( (read = in.read(data)) > 0 ) {
out.write(data, 0, read);
}
out.close();
in.close();
fs.close();
}
public static void main(String[] args) {
Configuration conf = new Configuration();
conf.set("fs.default.name","hdfs://localhost:9000");
String localFilePath = "/home/hadoop/text.txt";
String remoteFilePath = "/user/hadoop/text.txt";
String choice = "append";
try {
Boolean fileExists = false;
if (HDFSApi.test(conf, remoteFilePath)) {
fileExists = true;
System.out.println(remoteFilePath + " 已存在.");
} else {
System.out.println(remoteFilePath + " 不存在.");
}
if ( !fileExists) {
HDFSApi.copyFromLocalFile(conf, localFilePath, remoteFilePath);
System.out.println(localFilePath + " 已上传至 " + remoteFilePath);
} else if ( choice.equals("overwrite") ) {
HDFSApi.copyFromLocalFile(conf, localFilePath, remoteFilePath);
System.out.println(localFilePath + " 已覆盖 " + remoteFilePath);
} else if ( choice.equals("append") ) {
HDFSApi.appendToFile(conf, localFilePath, remoteFilePath);
System.out.println(localFilePath + " 已追加至 " + remoteFilePath);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
报错信息:Failed to replace a bad datanode the existing pipeline to no more good datanodes begin g available to try.
直观判定为文件在pineline传输中DN被认为是坏的数据节点,需要新的好的数据节点来确保文件在pineline中传输正常。
官网说明:hdfs-default.xml 配置文件
如果写入管道中存在数据节点/网络故障,DFSClient 将尝试从管道中删除失败的数据节点,然后继续使用其余数据节点进行写入。因此,管道中的数据节点数会减少。该功能是向管道添加新的数据节点。这是用于启用/禁用该功能的站点范围的属性(dfs.client.block.write.replace-datanode-on-failure.policy) 。当集群大小非常小时(例如 3 个节点或更少 ),集群管理员可能希望在默认配置文件中将策略设置为 NEVER 或禁用此功能。否则,用户可能会遇到异常高的管道故障率,因为无法找到新的数据节点进行替换。
而且,仅当 dfs.client.block.write.replace-datanode-on-failure.enable 的值为 true 时,才使用此属性。ALWAYS :删除现有数据节点时,始终添加新的数据节点。NEVER :从不添加新的数据节点。默认值:让 r 作为复制编号。设 n 为现有数据节点的数量。仅当 r 大于或等于 3 且 (1) floor(r/2) 大于或等于 n 时,才添加新的数据节点;或 (2) r 大于 n,并且块被hflushed/appended。
方法一:在Java代码main函数中加入以下两行代码:
conf.set("dfs.client.block.write.replace-datanode-on-failure.policy","NEVER");
conf.set("dfs.client.block.write.replace-datanode-on-failure.enable","true");
方法二:在hdfs-site.xml中加入以下代码:
<property>
<name>dfs.client.block.write.replace-datanode-on-failure.policy</name>
<value>NEVER</value>
</property>
三、注意点
一般来说,如果集群中DN个数小于等于3 (本机器采用伪分布式模式,只有一个DN,但是为了测试方便,直接开启即可)都不建议开启
|