IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> scala版本spark将HDFS上的数据同步到hbase -> 正文阅读

[大数据]scala版本spark将HDFS上的数据同步到hbase

1 spark将HDFS上的数据同步到hbase

我们也知道hbase底层的数据源是hFile。
将hdfs数据转换为hfile, 通过bukload快速导入hbase ,当然里面有很多坑.
比如 : 版本不一致.
还有就是本地版本和集群版本不一致导致class不存在.写hbase代码最好是使用java和scala。我这里使用的是spark2.4 + hbase 2.1 切记不同版本使用的方法不一样。
我这里也保留了一些老的class希望对你有用。

2 代码

话不多说直接上代码。我这里只是同步,所以都是写在了一个类上面。

package com.test.task

import java.net.URI

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hbase.client.{ConnectionFactory, Result, Table}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.{HFileOutputFormat2, LoadIncrementalHFiles, TableOutputFormat}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase._
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.{SparkConf, SparkContext}

/**
 * Author: say
 * Description:
 * Create: 2021/5/1 13:14
 */
object Hdfs2Hbase {

    var cdhPath = ""
    var zookeeperQuorum = ""
    var dataSourcePath = ""
    var hdfsRootPath = ""
    var hFilePath = ""
    val tableName = "api:real_time_label"
    val familyName = "baseInfo"

    def main(args: Array[String]): Unit = {

        //设置用户
        System.setProperty("HADOOP_USER_NAME", "say")

        //  运行shell 传参执行环境
        //  生产运行记得设置运行参数
        if (args.length >= 1) {
            println("设置参数,运行环境:"+args(0))
            if ("online".equals(args(0))) {
                cdhPath = "hdfs://say-hdfs-cluster"
                zookeeperQuorum = "192.168.1.101:2181,192.168.1.102:2181,192.168.1.103:2181"
                dataSourcePath = cdhPath+"/user/say/hbase/txt/"
                hdfsRootPath = cdhPath+"/user/say/"
                hFilePath = cdhPath+"/user/say/hbase/hfile"
            } else {
                cdhPath = "hdfs://say-cdh-master02.net:8020"
                zookeeperQuorum = "192.168.2.101:2181,192.168.2.102:2181,192.168.2.103:2181"
                dataSourcePath = cdhPath+"/user/say/hbase/txt/"
                hdfsRootPath = cdhPath+"/user/say/"
                hFilePath = cdhPath+"/user/say/hbase/hfile"
            }
        } else {
            println("运行环境: test")
            cdhPath = "hdfs://say-cdh-master02.net:8020"
            zookeeperQuorum = "192.168.1.101:2181,192.168.1.102:2181,192.168.1.103:2181"
            dataSourcePath = cdhPath+"/user/say/hbase/txt/"
            hdfsRootPath = cdhPath+"/user/say/"
            hFilePath = cdhPath+"/user/say/hbase/hfile"
        }

        val sparkConf = new SparkConf()
          .setAppName("hive2Hbase")
//          .setMaster("local[*]")    //本地运行打开,也可以设置参数 ,记得设置运行参数
        sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
        sparkConf.registerKryoClasses(Array(classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable]))

        val sc = new SparkContext(sparkConf)
        val hadoopConf = new Configuration()
        hadoopConf.set("fs.defaultFS", hdfsRootPath)
        hadoopConf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem")
        val fileSystem = FileSystem.get(new URI(cdhPath), hadoopConf)
        val hbaseConf = HBaseConfiguration.create(hadoopConf)
        println("我在这里")
        hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, zookeeperQuorum)
        hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
        val hbaseConn = ConnectionFactory.createConnection(hbaseConf)

        val admin = hbaseConn.getAdmin
        println("连接成功啦~")

        // 0. 准备程序运行的环境
        // 如果 HBase 表不存在,就创建一个新表
        if (!admin.tableExists(TableName.valueOf(tableName))) {
            val desc = new HTableDescriptor(TableName.valueOf(tableName))
            val hcd = new HColumnDescriptor(familyName)
            desc.addFamily(hcd)
            admin.createTable(desc)
        }
        // 如果存放 HFile文件的路径已经存在,就删除掉
        if (fileSystem.exists(new Path(hFilePath))) {
            fileSystem.delete(new Path(hFilePath), true)
        }

        // 1. 清洗需要存放到 HFile 中的数据,rowKey 一定要排序,否则会报错:
        // java.io.IOException: Added a key not lexically larger than previous.

        val data = sc.textFile(dataSourcePath)
          .map(str => {
              val valueStr: Array[String] = str.split("\\|")
              val rowkey = valueStr(0)
              // 处理数据的逻辑
              (rowkey, valueStr)
          })
          .map(tuple => {
              val kv = new KeyValue(Bytes.toBytes(tuple._1), Bytes.toBytes(familyName), Bytes.toBytes(tuple._2(1)), Bytes.toBytes(tuple._2(2)))
              (new ImmutableBytesWritable(Bytes.toBytes(tuple._1)), kv)
          })

        // 2. Save Hfiles on HDFS
        val table: Table = hbaseConn.getTable(TableName.valueOf(tableName))
        val job = Job.getInstance(hbaseConf)
        job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])
        job.setMapOutputValueClass(classOf[KeyValue])
        val regionLocator = hbaseConn.getRegionLocator(TableName.valueOf(tableName))
//        println(table.getTableDescriptor)
//        println(table)

        HFileOutputFormat2.configureIncrementalLoadMap(job, table.getTableDescriptor)

        val jobConfiguration = job.getConfiguration
        jobConfiguration.set("mapreduce.output.fileoutputformat.outputdir", hFilePath)
        data
          .sortBy(x => (x._1, x._2.getKeyString), ascending = true)
          .saveAsNewAPIHadoopDataset(jobConfiguration)
        println("成功生成HFILE")
//这里就是版本不同而出现的问题.
//        filedata.saveAsNewAPIHadoopFile(hFilePath,classOf[ImmutableBytesWritable],Class[KeyValue],Class[HFileOutputFormat2],hbaseConf)
//        filedata.saveAsNewAPIHadoopFile(
//              hFilePath,
//              classOf[ImmutableBytesWritable],
//              classOf[KeyValue],
//              classOf[HFileOutputFormat2],
//              hadoopConf
//          )

        //  3. Bulk load Hfiles to Hbase
        val bulkLoader = new LoadIncrementalHFiles(hbaseConf)

        bulkLoader.doBulkLoad(new Path(hFilePath), admin, table, regionLocator)

        hbaseConn.close()
        fileSystem.close()
        sc.stop()
    }
}

3 pom

为防止大家也和我出现版本冲突 而导致not found class 的问题.我给大家贴出pom.xml .我有很多注释,也是防止不一致,你就单独使用依赖 .可能还需要建一个build目录.

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.test.task</groupId>
    <artifactId>Hdfs2Hbase</artifactId>
    <version>1.0-SNAPSHOT</version>


    <properties>
        <spark.version>2.4.0</spark.version>
        <scala.version>2.11.12</scala.version>
        <hbase.version>2.1.0</hbase.version>
        <hadoop.version>3.0.0</hadoop.version>
    </properties>

    <dependencies>

        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
<!--        <dependency>-->
<!--            <groupId>org.scala-lang</groupId>-->
<!--            <artifactId>scala-reflect</artifactId>-->
<!--            <version>${scala.version}</version>-->
<!--        </dependency>-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.hadoop</groupId>
                    <artifactId>hadoop-client</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.apache.hadoop</groupId>
                    <artifactId>hadoop-hdfs</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-it</artifactId>
            <version>${hbase.version}</version>
        </dependency>
<!--        <dependency>-->
<!--            <groupId>org.apache.hbase</groupId>-->
<!--            <artifactId>hbase-common</artifactId>-->
<!--            <version>${hbase.version}</version>-->
<!--        </dependency>-->
<!--        <dependency>-->
<!--            <groupId>org.apache.hbase</groupId>-->
<!--            <artifactId>hbase-server</artifactId>-->
<!--            <version>${hbase.version}</version>-->
<!--        </dependency>-->
<!--        <dependency>-->
<!--            <groupId>org.apache.hbase</groupId>-->
<!--            <artifactId>hbase-client</artifactId>-->
<!--            <version>${hbase.version}</version>-->
<!--            <exclusions>-->
<!--                <exclusion>-->
<!--                    <groupId>log4j</groupId>-->
<!--                    <artifactId>log4j</artifactId>-->
<!--                </exclusion>-->
<!--                <exclusion>-->
<!--                    <groupId>org.codehaus.jackson</groupId>-->
<!--                    <artifactId>jackson-core-asl</artifactId>-->
<!--                </exclusion>-->
<!--                <exclusion>-->
<!--                    <groupId>org.slf4j</groupId>-->
<!--                    <artifactId>slf4j-log4j12</artifactId>-->
<!--                </exclusion>-->
<!--            </exclusions>-->
<!--        </dependency>-->

        <!-- 指定hadoop-client API的版本 -->
<!--        <dependency>-->
<!--            <groupId>org.apache.hadoop</groupId>-->
<!--            <artifactId>hadoop-client</artifactId>-->
<!--            <version>${hadoop.version}</version>-->
<!--        </dependency>-->
<!--        <dependency>-->
<!--            <groupId>org.apache.hadoop</groupId>-->
<!--            <artifactId>hadoop-common</artifactId>-->
<!--            <version>${hadoop.version}</version>-->
<!--        </dependency>-->
    </dependencies>


    <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <testSourceDirectory>src/test/scala</testSourceDirectory>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.5.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <scalaVersion>${scala.version}</scalaVersion>
                    <args>
                        <arg>-target:jvm-1.8</arg>
                    </args>
                    <jvmArgs>
                        <jvmArg>-Xss2048K</jvmArg>
                    </jvmArgs>
                </configuration>

            </plugin>


            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-dependency-plugin</artifactId>
                <executions>
                    <execution>
                        <id>copy-dependencies</id>
                        <phase>prepare-package</phase>
                        <goals>
                            <goal>copy-dependencies</goal>
                        </goals>
                        <configuration>
                            <outputDirectory>${project.build.directory}/lib</outputDirectory>
                            <overWriteReleases>false</overWriteReleases>
                            <overWriteSnapshots>false</overWriteSnapshots>
                            <overWriteIfNewer>true</overWriteIfNewer>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <mainClass></mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <appendAssemblyId>false</appendAssemblyId>
                    <finalName>${project.artifactId}-${ver}</finalName>
                    <descriptors>
                        <descriptor>src/build/package.xml</descriptor>
                    </descriptors>
                </configuration>
                <executions>
                    <execution>
                        <id>single</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-antrun-plugin</artifactId>
                <version>1.8</version>
                <executions>
                    <execution>
                        <id>del-lib-jar</id>
                        <phase>package</phase>
                        <goals>
                            <goal>run</goal>
                        </goals>
                        <configuration>
                            <tasks>
                                <echo>delete lib jar...</echo>
                                <!-- <delete dir="src/main/webapp/WEB-INF/lib"/> -->
                                <delete>
                                    <fileset dir="target" includes="${project.artifactId}-*.jar"/>
                                </delete>
                            </tasks>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
        <resources>
            <resource>
                <directory>src/main/resources</directory>
                <includes>
                    <include>**/*</include>
                </includes>
            </resource>
            <resource>
                <directory>bin/${env}</directory>
                <includes>
                    <include>**/*.sh</include>
                </includes>
                <filtering>false</filtering>
            </resource>
            <resource>
                <directory>src/main/resources/env/${env}</directory>
            </resource>
            <resource>
                <directory>src/main/scala</directory>
                <includes>
                    <include>**/*.*</include>
                </includes>
                <filtering>false</filtering>
            </resource>
        </resources>
    </build>
    <reporting>
        <plugins>
            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <configuration>
                    <scalaVersion>${scala.version}</scalaVersion>
                </configuration>
            </plugin>
        </plugins>
    </reporting>
    <profiles>
        <profile>
            <id>prd</id>
            <activation>
                <activeByDefault>true</activeByDefault>
            </activation>
            <properties>
                <env>prd</env>
                <ver>2.0.0-prd</ver>
            </properties>
        </profile>
        <profile>
            <id>dev</id>
            <properties>
                <env>dev</env>
                <ver>1.0.0-SNAPSHOT</ver>
            </properties>
        </profile>
        <profile>
            <id>test</id>
            <properties>
                <env>test</env>
                <ver>1.0.0-SNAPSHOT</ver>
            </properties>
        </profile>
        <profile>
            <id>rc</id>
            <properties>
                <env>rc</env>
                <ver>1.0.0-SNAPSHOT</ver>
            </properties>
        </profile>
    </profiles>

    <repositories>
        <repository>
            <id>aliyun</id>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
            <releases>
                <enabled>true</enabled>
            </releases>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </repository>
    </repositories>

</project>
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-03-12 17:36:19  更:2022-03-12 17:40:29 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 9:12:26-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码