这里其实是一个实时的项目,接kafka,但是开始需要把oracle的数据同步到Hbase,之前的方案
1.sqoop抽到hbase,特慢
2.sqoop抽到hive,hive建Hbase映射表,再利用sparksql同步到hbase,也挺慢
以下是现在的代码
private val logger = LoggerFactory.getLogger(jdbcTes.getClass)
def main(args: Array[String]): Unit = {
val sparksession = SparkSession.builder()
.appName("jdbcTest")
.config("spark.debug.maxToStringFields", "200")
.getOrCreate()
//设置序列化方式
sparksession.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
sparksession.conf.set("spark.debug.maxToStringFields","100")
import sparksession.implicits._
//val user="simple"
//val password = "sma4444"
val oracleusername = args(0)//oracle用户名
val oraclepassword =args(1)//oracle密码
val conn_str=args(2)//oracle连接串
val hfilePath = args(3)//hfile文件上传地址
val oracleTableName = args(4)//要抽取的oracle表名
val port = args(5)//端口号
val ip = args(6) //zookeeperip,以,分隔
val hadoopUserName=args(7)//"sma_admin"
val hbaseTableName = args(8)//抽到hbase的名字
val rowkeyLine=args(9)//rowkey即主键
val sqlfilepath=args(10)//saprksql
val coreflag=args(11)//标识
//val conn_str = "jdbc:oracle:thin:@ip:1521:orcl"
//val hadoopRootPath = "hdfs://nameservice1/user"
//val hfilePath = "hdfs://nameservice1/user/****"
logger.info("-----连接" + conn_str + "的数据库-----")
val df: Dataset[Row] = sparksession.read
.format("jdbc")
.option("url", conn_str)
.option("dbtable", oracleTableName)
.option("user", oracleusername)
.option("password", oraclepassword)
.option("driver", "oracle.jdbc.driver.OracleDriver")
.load()
logger.info("-----连接成功-----")
//
//df.createOrReplaceTempView("TEMP_LCPOLOTHER")
logger.info("创建"+oracleTableName+"的临时表")
df.createOrReplaceTempView("TEMPTABLE")
println(port+ip+hadoopUserName+hbaseTableName)
val config = HBaseConfiguration.create()
config.set("hbase.zookeeper.property.clientPort", port)
config.set("hbase.zookeeper.quorum",ip)
config.set("hadoop.user.name",hadoopUserName)
config.set("hbase.mapreduce.hfileoutputformat.table.name", hbaseTableName)
config.set("hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily", "400")
//表连接
val connection = ConnectionFactory.createConnection(config)
//val table = connection.getTable(TableName.valueOf(tableName))
val regionLocator = connection.getRegionLocator(TableName.valueOf(hbaseTableName))
//判断表是否存在,存在先清空并删除
hbaseUtils.deleteTable(hbaseTableName, config)
//创建表
hbaseUtils.createspiltTable(hbaseTableName, "info", config)
//获取sql
val sql: String = getSql.initAllSql(sqlfilepath).getProperty(oracleTableName)
val dframe: Dataset[Row] = sparksession.sql(sql)
var rowkey: StringBuilder = null
var valueCol: StringBuilder = null
val OTableName: String = oracleTableName.substring(oracleTableName.indexOf(".")+1, oracleTableName.length)
val valueLine: String = oracleTableColumn.getColumn(conn_str, oracleusername, oraclepassword, OTableName, "").replace(" ","")
//dframe.map(line => line.asInstanceOf[TransferTime])
dframe.rdd.map(row => {
//得到rowkey
rowkey = new StringBuilder
var kvArray: Array[KeyValue] = null
rowkey.delete(0, rowkey.size)
val rowkeyArr = rowkeyLine.split(",")
for (i <- 0 until rowkeyArr.length) {
rowkey.append(row.getAs(rowkeyArr(i)).toString)
rowkey.append("_")
}
//去掉最后一个“_”
rowkey.deleteCharAt(rowkey.lastIndexOf("_"))
var rowkeyhash = Bytes.toBytes(hbaseUtils.getHashConcat(rowkey.toString()))
valueCol = new StringBuilder
valueCol.delete(0, valueCol.size)
//获取列名
val valueArr = valueLine.split(",")
val rowdata = new mutable.HashMap[String, String]()
// logger.info("hhhhh")
for (j <- 0 until valueArr.length) {
//value为空就不put了
if (row.getAs(valueArr(j)) != null && !row.getAs(valueArr(j)).equals("")) {
rowdata.put(valueArr(j), row.getAs(valueArr(j)))
}
}
// logger.info("paixu")
//必须排序否则报错,列名必须按字典排序后才能写入
val data: Seq[(String, String)] = rowdata.toSeq.sorted
//logger.info("kaishi")
if (!rowkey.equals("")) {
kvArray = new Array[KeyValue](data.size + 1)
for (i <- data.indices) {
val kv: KeyValue = new KeyValue(rowkeyhash, Bytes.toBytes("info"), Bytes.toBytes(String.valueOf(data(i)._1)), Bytes.toBytes(data(i)._2))
kvArray(i) = kv
}
val kv: KeyValue = new KeyValue(rowkeyhash, Bytes.toBytes("info"), Bytes.toBytes("systype"), Bytes.toBytes(coreflag))
kvArray(data.size) = kv
} else {
kvArray = new Array[KeyValue](1)
rowkeyhash = Bytes.toBytes(hbaseUtils.getHashConcat("1343678"))
val kv = new KeyValue(rowkeyhash, Bytes.toBytes("info"), Bytes.toBytes("ccc"), Bytes.toBytes(hbaseUtils.getHashConcat("1028")))
kvArray(0) = kv
}
(new ImmutableBytesWritable(rowkeyhash), kvArray)
})
.flatMapValues(x=>x)
.sortByKey()
//.sortBy(x =>x._1,true)
.saveAsNewAPIHadoopFile(hfilePath,
classOf[ImmutableBytesWritable],
classOf[KeyValue],
classOf[HFileOutputFormat2],
config)
val bulkLoader = new LoadIncrementalHFiles(config)
bulkLoader.doBulkLoad(new Path(hfilePath), connection.getAdmin, connection.getTable(TableName.valueOf(hbaseTableName)), regionLocator)
connection.close()
logger.info(hbaseTableName + "初始化完成")
sparksession.close()
}
|