IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 同步Oracle到Hbase(sparksql+bulkLoad) -> 正文阅读

[大数据]同步Oracle到Hbase(sparksql+bulkLoad)

这里其实是一个实时的项目,接kafka,但是开始需要把oracle的数据同步到Hbase,之前的方案

1.sqoop抽到hbase,特慢

2.sqoop抽到hive,hive建Hbase映射表,再利用sparksql同步到hbase,也挺慢

以下是现在的代码

  private val logger = LoggerFactory.getLogger(jdbcTes.getClass)

  def main(args: Array[String]): Unit = {
    val sparksession = SparkSession.builder()
      .appName("jdbcTest")
      .config("spark.debug.maxToStringFields", "200")
      .getOrCreate()
    //设置序列化方式
    sparksession.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    sparksession.conf.set("spark.debug.maxToStringFields","100")
    import sparksession.implicits._
    //val user="simple"
    //val password = "sma4444"
    val oracleusername = args(0)//oracle用户名
    val oraclepassword =args(1)//oracle密码
    val conn_str=args(2)//oracle连接串
    val hfilePath = args(3)//hfile文件上传地址
    val oracleTableName = args(4)//要抽取的oracle表名
    val port = args(5)//端口号
    val ip = args(6) //zookeeperip,以,分隔
    val hadoopUserName=args(7)//"sma_admin"
    val hbaseTableName = args(8)//抽到hbase的名字
    val rowkeyLine=args(9)//rowkey即主键
    val sqlfilepath=args(10)//saprksql
    val coreflag=args(11)//标识



    //val conn_str = "jdbc:oracle:thin:@ip:1521:orcl"
    //val hadoopRootPath = "hdfs://nameservice1/user"
    //val hfilePath = "hdfs://nameservice1/user/****"

    logger.info("-----连接" + conn_str + "的数据库-----")
    val df: Dataset[Row] = sparksession.read
      .format("jdbc")
      .option("url", conn_str)
      .option("dbtable", oracleTableName)
      .option("user", oracleusername)
      .option("password", oraclepassword)
      .option("driver", "oracle.jdbc.driver.OracleDriver")
      .load()
    logger.info("-----连接成功-----")
    //
    //df.createOrReplaceTempView("TEMP_LCPOLOTHER")
    logger.info("创建"+oracleTableName+"的临时表")
    df.createOrReplaceTempView("TEMPTABLE")


    println(port+ip+hadoopUserName+hbaseTableName)
    val config = HBaseConfiguration.create()
    config.set("hbase.zookeeper.property.clientPort", port)
    config.set("hbase.zookeeper.quorum",ip)
    config.set("hadoop.user.name",hadoopUserName)
    config.set("hbase.mapreduce.hfileoutputformat.table.name", hbaseTableName)
    config.set("hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily", "400")

    //表连接
    val connection = ConnectionFactory.createConnection(config)
    //val table = connection.getTable(TableName.valueOf(tableName))
    val regionLocator = connection.getRegionLocator(TableName.valueOf(hbaseTableName))
    //判断表是否存在,存在先清空并删除
    hbaseUtils.deleteTable(hbaseTableName, config)
    //创建表
    hbaseUtils.createspiltTable(hbaseTableName, "info", config)
    //获取sql
    val sql: String = getSql.initAllSql(sqlfilepath).getProperty(oracleTableName)
    
    
    val dframe: Dataset[Row] = sparksession.sql(sql)


    var rowkey: StringBuilder = null
    var valueCol: StringBuilder = null
    val OTableName: String = oracleTableName.substring(oracleTableName.indexOf(".")+1, oracleTableName.length)
    val valueLine: String = oracleTableColumn.getColumn(conn_str, oracleusername, oraclepassword, OTableName, "").replace(" ","")
    //dframe.map(line => line.asInstanceOf[TransferTime])
    dframe.rdd.map(row => {


      //得到rowkey
      rowkey = new StringBuilder

      var kvArray: Array[KeyValue] = null

      rowkey.delete(0, rowkey.size)
      val rowkeyArr = rowkeyLine.split(",")
      for (i <- 0 until rowkeyArr.length) {
        rowkey.append(row.getAs(rowkeyArr(i)).toString)
        rowkey.append("_")
      }
      //去掉最后一个“_”
      rowkey.deleteCharAt(rowkey.lastIndexOf("_"))
      var rowkeyhash = Bytes.toBytes(hbaseUtils.getHashConcat(rowkey.toString()))
      valueCol = new StringBuilder
      valueCol.delete(0, valueCol.size)

      //获取列名
      val valueArr = valueLine.split(",")

      val rowdata = new mutable.HashMap[String, String]()
      // logger.info("hhhhh")
      for (j <- 0 until valueArr.length) {
        //value为空就不put了
        if (row.getAs(valueArr(j)) != null && !row.getAs(valueArr(j)).equals("")) {
          rowdata.put(valueArr(j), row.getAs(valueArr(j)))
        }
      }
      // logger.info("paixu")
      //必须排序否则报错,列名必须按字典排序后才能写入
      val data: Seq[(String, String)] = rowdata.toSeq.sorted
      //logger.info("kaishi")
      if (!rowkey.equals("")) {

        kvArray = new Array[KeyValue](data.size + 1)
        for (i <- data.indices) {
          val kv: KeyValue = new KeyValue(rowkeyhash, Bytes.toBytes("info"), Bytes.toBytes(String.valueOf(data(i)._1)), Bytes.toBytes(data(i)._2))
          kvArray(i) = kv
        }

        val kv: KeyValue = new KeyValue(rowkeyhash, Bytes.toBytes("info"), Bytes.toBytes("systype"), Bytes.toBytes(coreflag))
        kvArray(data.size) = kv


      } else {
        kvArray = new Array[KeyValue](1)
        rowkeyhash = Bytes.toBytes(hbaseUtils.getHashConcat("1343678"))
        val kv = new KeyValue(rowkeyhash, Bytes.toBytes("info"), Bytes.toBytes("ccc"), Bytes.toBytes(hbaseUtils.getHashConcat("1028")))
        kvArray(0) = kv
      }
      (new ImmutableBytesWritable(rowkeyhash), kvArray)
    })
      .flatMapValues(x=>x)
      .sortByKey()
      //.sortBy(x =>x._1,true)
      .saveAsNewAPIHadoopFile(hfilePath,
        classOf[ImmutableBytesWritable],
        classOf[KeyValue],
        classOf[HFileOutputFormat2],
        config)

    val bulkLoader = new LoadIncrementalHFiles(config)

    bulkLoader.doBulkLoad(new Path(hfilePath), connection.getAdmin, connection.getTable(TableName.valueOf(hbaseTableName)), regionLocator)

    connection.close()
    logger.info(hbaseTableName + "初始化完成")
    sparksession.close()
  }

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-10 14:37:33  更:2021-07-10 14:39:27 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/6 21:12:34-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码