1、使用sqoop直接导入
(1)创建Hbase表
-- 1、如果用户表存在先删除
hbase(main):013:0> disable 'tbl_users' hbase(main):014:0> drop 'tbl_users'
-- 或者清空表
hbase(main):015:0> truncate 'tbl_users'
-- 2、创建用户表
hbase(main):016:0> create 'tbl_users','detail'
hbase(main):019:0> desc "tbl_users"
Table tbl_users is ENABLED tbl_users COLUMN FAMILIES DESCRIPTION {NAME => 'detail', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREVER', COMPRESSION => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE => 'true', BLOCKSIZE => '65536', REPLICATION_SCOPE => '0'}
(2)sqoop全量导入Hbase中
可以使用SQOOP将MySQL表的数据导入到HBase表中,指定 表的名称、列簇及RowKey
/export/servers/sqoop/bin/sqoop import \
-D sqoop.hbase.add.row.key=true \
--connect jdbc:mysql://bigdata-cdh01:3306/tags_dat \
--username root \
--password 123456 \
--table tbl_users \
--hbase-create-table \
--hbase-table tbl_users \
--column-family detail \
--hbase-row-key id \
--num-mappers 2
参数含义解释:
1、-D sqoop.hbase.add.row.key=true 是否将rowkey相关字段写入列族中,默认为false,默认情况下你将在列族中看不到任何row key中的字段。注意,该参数必须放在import之后。
2、--hbase-create-table 如果hbase中该表不存在则创建
3、--hbase-table 对应的hbase表名
4、--hbase-row-key hbase表中的rowkey,注意格式 5、--column-family hbase表的列族
(3)sqoop增量导入Hbase中
/export/servers/sqoop/bin/sqoop import \
-D sqoop.hbase.add.row.key=true \
--connect jdbc:mysql://bigdata-cdh01.itcast.cn:3306/tags_dat \
--username root \
--password 123456 \
--table tbl_logs \
--hbase-create-table \
--hbase-table tag_logs \
--column-family detail \
--hbase-row-key id \
--num-mappers 20 \
--incremental lastmodified \
--check-column log_time \
--last-value '2019-08-13 00:00:00' \
相关增量导入参数说明:
1、--incremental lastmodified 增量导入支持两种模式 append 递增的列;lastmodified 时间戳。
2、--check-column 增量导入时参考的列
3、--last-value 最小值,这个例子中表示导入2019-08-13 00:00:00到今天的值
注意: 使用SQOOP导入数据到HBase表中,有一个限制:
需要指定RDBMs表中的某个字段作为HBase表的ROWKEY,如果HBase表的ROWKEY为多 个字段组合,就无法指定,所以此种方式有时候不能使用。
2、Hbase自带工具—HBase ImportTSV
HBase ImportTSV将tsv(也可以是csv,每行数据中各个字段使用分隔符分割)格式文本数据,加载到HBase表中。
(1) 采用Put方式加载导入
采用Put方式向HBase表中插入数据流程: Put -> WAL 预写日志 -> MemStore(内存) ,当达到一定大写Spill到磁盘上:
1) 先导入数据至Hive表
使用Sqoop将MySQL数据库表中的数据导入到Hive表中(本质就是存储在HDFS上)
/export/servers/sqoop/bin/sqoop import \
--connect jdbc:mysql://bigdata-cdh01:3306/tags_dat \
--username root \
--password 123456 \
--table tbl_users \
--direct \
--hive-overwrite \
--delete-target-dir \
--fields-terminated-by '\t' \
--lines-terminated-by '\n' \
--hive-table tags_dat.tbl_users \
--hive-import \
--num-mappers 1
2) 从Hive表到Hbase
HADOOP_HOME=/export/servers/hadoop
HBASE_HOME=/export/servers/hbase
HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`:${HBASE_HOME}/conf ${HADOOP_HOME}/bin/yarn jar ${HBASE_HOME}/lib/hbase-server-1.2.0- cdh5.14.0.jar \
importtsv \
-
Dimporttsv.columns=HBASE_ROW_KEY,detail:log_id,detail:remote_ip,detail:site_global_ticket,detail:site_global_session,detail:global_user_id,detail:cookie_text,detail:user_agent,detail:ref_url,detail:loc_url,detail:log _time \
tbl_logs \
/user/hive/warehouse/tags_dat.db/tbl_logs
(2) 将数据直接保存为HFile文件,然后加载到HBase表中
HADOOP_HOME=/export/servers/hadoop
HBASE_HOME=/export/servers/hbase
HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`:${HBASE_HOME}/conf ${HADOOP_HOME}/bin/yarn jar ${HBASE_HOME}/lib/hbase-server-1.2.0- cdh5.14.0.jar \
importtsv \
-Dimporttsv.bulk.output=hdfs://bigdata-cdh01:8020/datas/output_hfile/tbl_logs \
-
Dimporttsv.columns=HBASE_ROW_KEY,detail:log_id,detail:remote_ip,detail:site_global_ticket,detail:site_global_session,detail:global_user_id,detail:cookie_text,detail:user_agent,detail:ref_url,detail:loc_url,detail: log_time \
tbl_logs \
/user/hive/warehouse/tags_dat.db/tbl_logs
HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`:${HBASE_HOME}/conf ${HADOOP_HOME}/bin/yarn jar ${HBASE_HOME}/lib/hbase-server-1.2.0- cdh5.14.0.jar \
completebulkload \
hdfs://bigdata-cdh01:8020/datas/output_hfile/tbl_logs \
tbl_logs
注意: 1)、ROWKEY不能是组合主键 只能是某一个字段 2)、当表中列很多时,书写-Dimporttsv.columns值时很麻烦,容易出错
3、HBase Bulkload
在大量数据需要写入HBase时,通常有 put方式和bulkLoad 两种方式。
1、put方式为单条插入,在put数据时会先将数据的更新操作信息和数据信息 写入WAL , 在写入到WAL后, 数据就会被放到MemStore中 ,当MemStore满后数据就会被 flush到磁盘 (即形成HFile文件) ,在这种写操作过程会涉及到flush、split、compaction等操作,容易造 成节点不稳定,数据导入慢,耗费资源等问题,在海量数据的导入过程极大的消耗了系统 性能,避免这些问题最好的方法就是使用BulkLoad的方式来加载数据到HBase中。
2、BulkLoader利用HBase数据按照HFile格式存储在HDFS的原理,使用MapReduce直接批量 生成HFile格式文件后,RegionServers再将HFile文件移动到相应的Region目录下。
(1)编写MR程序
package com.yyds.tags.mr.etl;
import org.apache.hadoop.hbase.util.Bytes;
import java.util.ArrayList;
import java.util.List;
interface Constants {
String INPUT_PATH = "hdfs://bigdata-cdh01:8020/user/hive/warehouse/tags_dat.db/tbl_logs";
String HFILE_PATH = "hdfs://bigdata-cdh01:8020/datas/output_hfile/tbl_logs";
String TABLE_NAME = "tbl_logs";
byte[] COLUMN_FAMILY = Bytes.toBytes("detail");
List<byte[]> list = new ArrayList<byte[]>() {
private static final long serialVersionUID = -6125158551837044300L;
{
add(Bytes.toBytes("id"));
add(Bytes.toBytes("log_id"));
add(Bytes.toBytes("remote_ip"));
add(Bytes.toBytes("site_global_ticket"));
add(Bytes.toBytes("site_global_session"));
add(Bytes.toBytes("global_user_id"));
add(Bytes.toBytes("cookie_text"));
add(Bytes.toBytes("user_agent"));
add(Bytes.toBytes("ref_url"));
add(Bytes.toBytes("loc_url"));
add(Bytes.toBytes("log_time"));
}
};
}
package com.yyds.tags.mr.etl;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
public class LoadLogsToHBaseMapReduce extends Configured implements Tool {
private static Connection connection = null;
static class LoadLogsToHBase extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] split = value.toString().split("\\t");
if (split.length == Constants.list.size()) {
Put put = new Put(Bytes.toBytes(split[0]));
for (int i = 1; i < Constants.list.size(); i++) {
put.addColumn(
Constants.COLUMN_FAMILY,
Constants.list.get(i),
Bytes.toBytes(split[i])
);
}
context.write(new ImmutableBytesWritable(put.getRow()), put);
}
}
}
@Override
public int run(String[] strings) throws Exception {
Configuration configuration = super.getConf() ;
Job job = Job.getInstance(configuration);
job.setJobName(this.getClass().getSimpleName());
job.setJarByClass(LoadLogsToHBaseMapReduce.class);
FileInputFormat.addInputPath(job, new Path(Constants.INPUT_PATH));
job.setMapperClass(LoadLogsToHBase.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);
job.setOutputFormatClass(HFileOutputFormat2.class);
FileSystem hdfs = FileSystem.get(configuration) ;
Path outputPath = new Path(Constants.HFILE_PATH) ;
if(hdfs.exists(outputPath)){
hdfs.delete(outputPath, true) ;
}
FileOutputFormat.setOutputPath(job, outputPath);
Table table = connection.getTable(TableName.valueOf(Constants.TABLE_NAME));
HFileOutputFormat2.configureIncrementalLoad(
job,
table,
connection.getRegionLocator(TableName.valueOf(Constants.TABLE_NAME)) );
boolean isSuccess = job.waitForCompletion(true);
return isSuccess ? 0 : 1;
}
public static void main(String[] args) throws Exception {
Configuration configuration = HBaseConfiguration.create();
connection = ConnectionFactory.createConnection(configuration);
int status = ToolRunner.run(configuration, new LoadLogsToHBaseMapReduce(), args);
System.out.println("HFile文件生成完毕!~~~");
if (0 == status) {
Admin admin = connection.getAdmin();
Table table = connection.getTable(TableName.valueOf(Constants.TABLE_NAME));
LoadIncrementalHFiles load = new LoadIncrementalHFiles(configuration);
load.doBulkLoad(
new Path(Constants.HFILE_PATH),
admin,
table,
connection.getRegionLocator(TableName.valueOf(Constants.TABLE_NAME))
);
System.out.println("HFile文件移动完毕!~~~");
}
}
}
(2)编写spark程序
package com.yyds.tags.mr.etl.hfile
import scala.collection.immutable.TreeMap
object TableFieldNames{
val LOG_FIELD_NAMES: TreeMap[String, Int] = TreeMap(
("id", 0),
("log_id", 1),
("remote_ip", 2),
("site_global_ticket", 3),
("site_global_session", 4),
("global_user_id", 5),
("cookie_text", 6),
("user_agent", 7),
("ref_url", 8),
("loc_url", 9),
("log_time", 10)
)
}
package com.yyds.tags.mr.etl.hfile
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hbase.client.{ConnectionFactory, Table}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.{HFileOutputFormat2, LoadIncrementalHFiles, TableOutputFormat}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HBaseConfiguration, KeyValue, TableName}
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.immutable.TreeMap
object HBaseBulkLoader {
def getLineToData(line: String, family: String, fieldNames: TreeMap[String, Int]): List[(ImmutableBytesWritable, KeyValue)] = {
val length = fieldNames.size
val fieldValues: Array[String] = line.split("\\t", -1)
if (null == fieldValues || fieldValues.length != length) return Nil
val id: String = fieldValues(0)
val rowKey = Bytes.toBytes(id)
val ibw: ImmutableBytesWritable = new ImmutableBytesWritable(rowKey)
val columnFamily: Array[Byte] = Bytes.toBytes(family)
fieldNames.toList.map { case (fieldName, fieldIndex) =>
val keyValue = new KeyValue(
rowKey,
columnFamily,
Bytes.toBytes(fieldName),
Bytes.toBytes(fieldValues(fieldIndex))
)
(ibw, keyValue)
}
}
def main(args: Array[String]): Unit = {
if (args.length != 5) {
println("Usage: required params: <DataType> <HBaseTable> <Family> <InputDir> <OutputDir>")
System.exit(-1)
}
val Array(dataType, tableName, family, inputDir, outputDir) = args
val fieldNames = dataType.toInt match {
case 1 => TableFieldNames.LOG_FIELD_NAMES
case 2 => TableFieldNames.GOODS_FIELD_NAMES
case 3 => TableFieldNames.USER_FIELD_NAMES
case 4 => TableFieldNames.ORDER_FIELD_NAMES
}
val sc: SparkContext = {
val sparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
SparkContext.getOrCreate(sparkConf)
}
val keyValuesRDD: RDD[(ImmutableBytesWritable, KeyValue)] = sc.textFile(inputDir)
.filter(line => null != line)
.flatMap { line => getLineToData(line, family, fieldNames) }
.sortByKey()
val conf: Configuration = HBaseConfiguration.create()
conf.set("hbase.mapreduce.hfileoutputformat.table.name", tableName)
val dfs = FileSystem.get(conf)
val outputPath: Path = new Path(outputDir)
if (dfs.exists(outputPath)) {
dfs.delete(outputPath, true)
}
dfs.close()
val conn = ConnectionFactory.createConnection(conf)
val htableName = TableName.valueOf(tableName)
val table: Table = conn.getTable(htableName)
HFileOutputFormat2.configureIncrementalLoad(
Job.getInstance(conf),
table,
conn.getRegionLocator(htableName)
)
keyValuesRDD
.sortBy(x => (x._1, x._2.getKeyString), true)
.saveAsNewAPIHadoopFile(
outputDir,
classOf[ImmutableBytesWritable],
classOf[KeyValue],
classOf[HFileOutputFormat2],
conf
)
val load = new LoadIncrementalHFiles(conf)
load.doBulkLoad(outputPath, conn.getAdmin, table,
conn.getRegionLocator(htableName))
sc.stop()
}
}
|