1 spark将HDFS上的数据同步到hbase
我们也知道hbase底层的数据源是hFile。 将hdfs数据转换为hfile, 通过bukload快速导入hbase ,当然里面有很多坑. 比如 : 版本不一致. 还有就是本地版本和集群版本不一致导致class不存在.写hbase代码最好是使用java和scala。我这里使用的是spark2.4 + hbase 2.1 切记不同版本使用的方法不一样。 我这里也保留了一些老的class希望对你有用。
2 代码
话不多说直接上代码。我这里只是同步,所以都是写在了一个类上面。
package com.test.task
import java.net.URI
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hbase.client.{ConnectionFactory, Result, Table}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.{HFileOutputFormat2, LoadIncrementalHFiles, TableOutputFormat}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase._
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.{SparkConf, SparkContext}
object Hdfs2Hbase {
var cdhPath = ""
var zookeeperQuorum = ""
var dataSourcePath = ""
var hdfsRootPath = ""
var hFilePath = ""
val tableName = "api:real_time_label"
val familyName = "baseInfo"
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME", "say")
if (args.length >= 1) {
println("设置参数,运行环境:"+args(0))
if ("online".equals(args(0))) {
cdhPath = "hdfs://say-hdfs-cluster"
zookeeperQuorum = "192.168.1.101:2181,192.168.1.102:2181,192.168.1.103:2181"
dataSourcePath = cdhPath+"/user/say/hbase/txt/"
hdfsRootPath = cdhPath+"/user/say/"
hFilePath = cdhPath+"/user/say/hbase/hfile"
} else {
cdhPath = "hdfs://say-cdh-master02.net:8020"
zookeeperQuorum = "192.168.2.101:2181,192.168.2.102:2181,192.168.2.103:2181"
dataSourcePath = cdhPath+"/user/say/hbase/txt/"
hdfsRootPath = cdhPath+"/user/say/"
hFilePath = cdhPath+"/user/say/hbase/hfile"
}
} else {
println("运行环境: test")
cdhPath = "hdfs://say-cdh-master02.net:8020"
zookeeperQuorum = "192.168.1.101:2181,192.168.1.102:2181,192.168.1.103:2181"
dataSourcePath = cdhPath+"/user/say/hbase/txt/"
hdfsRootPath = cdhPath+"/user/say/"
hFilePath = cdhPath+"/user/say/hbase/hfile"
}
val sparkConf = new SparkConf()
.setAppName("hive2Hbase")
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
sparkConf.registerKryoClasses(Array(classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable]))
val sc = new SparkContext(sparkConf)
val hadoopConf = new Configuration()
hadoopConf.set("fs.defaultFS", hdfsRootPath)
hadoopConf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem")
val fileSystem = FileSystem.get(new URI(cdhPath), hadoopConf)
val hbaseConf = HBaseConfiguration.create(hadoopConf)
println("我在这里")
hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, zookeeperQuorum)
hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
val hbaseConn = ConnectionFactory.createConnection(hbaseConf)
val admin = hbaseConn.getAdmin
println("连接成功啦~")
if (!admin.tableExists(TableName.valueOf(tableName))) {
val desc = new HTableDescriptor(TableName.valueOf(tableName))
val hcd = new HColumnDescriptor(familyName)
desc.addFamily(hcd)
admin.createTable(desc)
}
if (fileSystem.exists(new Path(hFilePath))) {
fileSystem.delete(new Path(hFilePath), true)
}
val data = sc.textFile(dataSourcePath)
.map(str => {
val valueStr: Array[String] = str.split("\\|")
val rowkey = valueStr(0)
(rowkey, valueStr)
})
.map(tuple => {
val kv = new KeyValue(Bytes.toBytes(tuple._1), Bytes.toBytes(familyName), Bytes.toBytes(tuple._2(1)), Bytes.toBytes(tuple._2(2)))
(new ImmutableBytesWritable(Bytes.toBytes(tuple._1)), kv)
})
val table: Table = hbaseConn.getTable(TableName.valueOf(tableName))
val job = Job.getInstance(hbaseConf)
job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])
job.setMapOutputValueClass(classOf[KeyValue])
val regionLocator = hbaseConn.getRegionLocator(TableName.valueOf(tableName))
HFileOutputFormat2.configureIncrementalLoadMap(job, table.getTableDescriptor)
val jobConfiguration = job.getConfiguration
jobConfiguration.set("mapreduce.output.fileoutputformat.outputdir", hFilePath)
data
.sortBy(x => (x._1, x._2.getKeyString), ascending = true)
.saveAsNewAPIHadoopDataset(jobConfiguration)
println("成功生成HFILE")
val bulkLoader = new LoadIncrementalHFiles(hbaseConf)
bulkLoader.doBulkLoad(new Path(hFilePath), admin, table, regionLocator)
hbaseConn.close()
fileSystem.close()
sc.stop()
}
}
3 pom
为防止大家也和我出现版本冲突 而导致not found class 的问题.我给大家贴出pom.xml .我有很多注释,也是防止不一致,你就单独使用依赖 .可能还需要建一个build目录.
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.test.task</groupId>
<artifactId>Hdfs2Hbase</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<spark.version>2.4.0</spark.version>
<scala.version>2.11.12</scala.version>
<hbase.version>2.1.0</hbase.version>
<hadoop.version>3.0.0</hadoop.version>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-it</artifactId>
<version>${hbase.version}</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
<configuration>
<scalaVersion>${scala.version}</scalaVersion>
<args>
<arg>-target:jvm-1.8</arg>
</args>
<jvmArgs>
<jvmArg>-Xss2048K</jvmArg>
</jvmArgs>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>prepare-package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/lib</outputDirectory>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>false</overWriteSnapshots>
<overWriteIfNewer>true</overWriteIfNewer>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass></mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<appendAssemblyId>false</appendAssemblyId>
<finalName>${project.artifactId}-${ver}</finalName>
<descriptors>
<descriptor>src/build/package.xml</descriptor>
</descriptors>
</configuration>
<executions>
<execution>
<id>single</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId>
<version>1.8</version>
<executions>
<execution>
<id>del-lib-jar</id>
<phase>package</phase>
<goals>
<goal>run</goal>
</goals>
<configuration>
<tasks>
<echo>delete lib jar...</echo>
<delete>
<fileset dir="target" includes="${project.artifactId}-*.jar"/>
</delete>
</tasks>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
<resources>
<resource>
<directory>src/main/resources</directory>
<includes>
<include>**/*</include>
</includes>
</resource>
<resource>
<directory>bin/${env}</directory>
<includes>
<include>**/*.sh</include>
</includes>
<filtering>false</filtering>
</resource>
<resource>
<directory>src/main/resources/env/${env}</directory>
</resource>
<resource>
<directory>src/main/scala</directory>
<includes>
<include>**/*.*</include>
</includes>
<filtering>false</filtering>
</resource>
</resources>
</build>
<reporting>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<configuration>
<scalaVersion>${scala.version}</scalaVersion>
</configuration>
</plugin>
</plugins>
</reporting>
<profiles>
<profile>
<id>prd</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<properties>
<env>prd</env>
<ver>2.0.0-prd</ver>
</properties>
</profile>
<profile>
<id>dev</id>
<properties>
<env>dev</env>
<ver>1.0.0-SNAPSHOT</ver>
</properties>
</profile>
<profile>
<id>test</id>
<properties>
<env>test</env>
<ver>1.0.0-SNAPSHOT</ver>
</properties>
</profile>
<profile>
<id>rc</id>
<properties>
<env>rc</env>
<ver>1.0.0-SNAPSHOT</ver>
</properties>
</profile>
</profiles>
<repositories>
<repository>
<id>aliyun</id>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
</project>
|