code:
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hbase.{HBaseConfiguration, KeyValue, TableName}
import org.apache.hadoop.hbase.client.{Admin, HTable, Table}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2
import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.util.ThreadUtil.getResourceAsStream
import java.util.Properties
class BulkLoadDataToHBase {
}
object BulkLoadDataToHBase {
def main(args: Array[String]): Unit = {
System.setProperty("user.name", "hdfs")
val properties = new Properties
properties.load(getResourceAsStream("conf.properties"))
val hBaseConf: Configuration = HBaseConfiguration.create()
hBaseConf.addResource("hbase-site.xml")
hBaseConf.addResource("core-site.xml")
hBaseConf.addResource("yarn-site.xml")
hBaseConf.set("mapred.compress.map.output", "true")
hBaseConf.set("mapred.map.output.compression.codec", "org.apache.hadoop.io.compress.SnappyCodec")
hBaseConf.set("mapred.output.compress", "true")
hBaseConf.set("mapred.output.compression", "org.apache.hadoop.io.compress.SnappyCodec")
hBaseConf.set("hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily", "128")
val outTableName: String = args(1)
hBaseConf.set(TableOutputFormat.OUTPUT_TABLE, outTableName)
import org.apache.hadoop.hbase.client.Connection
import org.apache.hadoop.hbase.client.ConnectionFactory
val conn: Connection = ConnectionFactory.createConnection(hBaseConf)
val admin: Admin = conn.getAdmin
val outTable: Table = conn.getTable(TableName.valueOf(outTableName))
val job: Job = Job.getInstance(hBaseConf)
job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])
job.setMapOutputValueClass(classOf[KeyValue])
HFileOutputFormat2.configureIncrementalLoad(job, outTable, conn.getRegionLocator(TableName.valueOf(outTableName)))
val bulkLoader = new LoadIncrementalHFiles(hBaseConf)
bulkLoader.doBulkLoad(new Path(args(0)), admin, outTable, conn.getRegionLocator(TableName.valueOf(outTableName)))
}
}
编译打包后放到服务器上用java -classpath XXX.jar XXX.BulkLoadDataToHBase /path outPutTabel
运行程序前需要先在HBase中建好表
create 'tmp_v1', {NAME => 'T', DATA_BLOCK_ENCODING => 'FAST_DIFF', COMPRESSION => 'SNAPPY', METADATA => {'DISABLE_WAL' => 'true', 'IMMUTABLE_ROWS' => 'true'}}
此时运行会出现报错
CorruptHFileException: Problem reading HFile Trailer from file XXXXXXXX/XXXXXXX/
at org.apache.hadoop.hbase.io.hfile.HFile.pickReaderVersion(HFile.java:495)
at org.apache.hadoop.hbase.io.hfile.HFile.createReader(HFile.java:538)
at org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles.groupOrSplit(LoadIncrementalHFiles.java:661)
at org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles$3.call(LoadIncrementalHFiles.java:574)
at org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles$3.call(LoadIncrementalHFiles.java:571)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
... 3 more
Caused by: java.lang.UnsatisfiedLinkError: org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy()Z
at org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy(Native Method)
at org.apache.hadoop.io.compress.SnappyCodec.checkNativeCodeLoaded(SnappyCodec.java:63)
at org.apache.hadoop.io.compress.SnappyCodec.getDecompressorType(SnappyCodec.java:195)
at org.apache.hadoop.io.compress.CodecPool.getDecompressor(CodecPool.java:181)
at org.apache.hadoop.hbase.io.compress.Compression$Algorithm.getDecompressor(Compression.java:328)
at org.apache.hadoop.hbase.io.compress.Compression.decompress(Compression.java:423)
at org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext.prepareDecoding(HFileBlockDefaultDecodingContext.java:90)
at org.apache.hadoop.hbase.io.hfile.HFileBlock.unpack(HFileBlock.java:549)
at org.apache.hadoop.hbase.io.hfile.HFileBlock$AbstractFSReader$1.nextBlock(HFileBlock.java:1380)
at org.apache.hadoop.hbase.io.hfile.HFileBlock$AbstractFSReader$1.nextBlockWithBlockType(HFileBlock.java:1386)
at org.apache.hadoop.hbase.io.hfile.HFileReaderV2.<init>(HFileReaderV2.java:150)
at org.apache.hadoop.hbase.io.hfile.HFile.pickReaderVersion(HFile.java:483)
... 8 more
Caused by: java.lang.UnsatisfiedLinkError: org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy()Z
错误原因是程序运行时无法找到libsnappy.so.*相关的文件,检查了一遍程序中打包的依赖,发现可以找到对应的类 在google上搜索了后发现,有人建议在运行jar时添加-Djava.library.path=/XXX/hadoop/native/选项后,解决了问题。
于是在服务器上用 find / -name native后找到/opt/cloudera/parcels/CDH-6.3.0-1.cdh6.3.0.p0.1279813/lib/hadoop/lib/native这个路径,修改运行命令,添加 -Djava.library.path=/XXX/hadoop/native/选项后执行,发现数据成功被加载到HBase中
sudo -u hdfs /usr/java/jdk1.8.0_181-cloudera/bin/java \
-Djava.library.path=/opt/cloudera/parcels/CDH-6.3.0-1.cdh6.3.0.p0.1279813/lib/hadoop/lib/native \
-classpath /XXX.jar com.XXX.BulkLoadDataToHBase hdfs://172.168.100.171:8020/distcp_dir/test_d046b9c64abfbb9c67b16e97d0a28d20 tmp_v1
java.lang.NoSuchMethodError: org.slf4j.helpers.MessageFormatter.format(Ljava/lang/String;Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/String; 期间还遇到了这个异常,异常原因是打包后的程序内存在重复的slf4j包,删除低版本的保留高版本的jar包后,可解决该问题。
我这里是多了这两个
六月 30, 2022 3:15:59 下午 org.apache.zookeeper.ClientCnxn$SendThread logStartConnect
信息: Opening socket connection to server bd.vn0038.jmrh.com/172.168.100.171:2181. Will not attempt to authenticate using SASL (unknown error)
六月 30, 2022 3:15:59 下午 org.apache.zookeeper.ClientCnxn$SendThread run
警告: Session 0x0 for server bd.vn0038.jmrh.com/172.168.100.171:2181, unexpected error, closing socket connection and attempting reconnect
java.lang.NoSuchMethodError: org.slf4j.helpers.MessageFormatter.format(Ljava/lang/String;Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/String;
at org.slf4j.impl.JDK14LoggerAdapter.info(JDK14LoggerAdapter.java:326)
at org.apache.zookeeper.ClientCnxn$SendThread.primeConnection(ClientCnxn.java:962)
at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:352)
at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1224)
参考资料: using snappy and other compressions with Nifi hdfs components https://community.cloudera.com/t5/Community-Articles/using-snappy-and-other-compressions-with-Nifi-hdfs/ta-p/248341
使用 Snappy 文件源运行映射时,Informatica DEI 中出现错误:“java.lang.UnsatisfiedLinkError: org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy()Z” https://knowledge.informatica.com/s/article/498636?language=en_US
HBase排查|HBase Bulkload失败问题排查改进 https://www.modb.pro/db/57374
|