IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> SparkStreaming(java)读取Kafka(kerberos)写入Hbase(kerberos) -> 正文阅读

[大数据]SparkStreaming(java)读取Kafka(kerberos)写入Hbase(kerberos)

SparkStreaming(java)读取Kafka(kerberos)写入Hbase(kerberos)

概述

你好!我们这次的目标是使用SparkStreaming(java)读取kafka中的数据,写入Hbase,关于向Kafka内写数据的知识请参考:

开始

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import scala.Tuple2;

import java.io.IOException;
import java.util.*;

public class Temp {
    public static void main(String[] args) {
        SparkConf sparkConf = new SparkConf();
        sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        sparkConf.registerKryoClasses((Class<?>[]) Collections.singletonList(ConsumerRecord.class).toArray());
        SparkSession spark = SparkSession.builder().
                appName("DevelopSpark")
                .master("local[*]")
                .config(sparkConf)
                .enableHiveSupport()
                .getOrCreate();

        JavaSparkContext sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext());

        JavaStreamingContext javaStreamingContext = new JavaStreamingContext(sparkContext, Durations.seconds(5));
        String bootstrapServer = "192.168.165.63:9092,192.168.165.61:9092,192.168.165.62:9092";
        //zookeeper hostName
        String zkQuorum4 = "zk-56,zk-66,zk-67";
        // 此处需要topic名称
        List<String> topics = Arrays.asList("[topicName1]","[topicName2]");
        String keytabPath = "/Users/keytabs/user.keytab";
        // 准备kafka所需参数
        Map<String, Object> kafkaParams = new HashMap<String, Object>();
        kafkaParams.put("bootstrap.servers",bootstrapServer);
        kafkaParams.put("key.deserializer", StringDeserializer.class);
        kafkaParams.put("value.deserializer", StringDeserializer.class);
        kafkaParams.put("group.id","1");
        kafkaParams.put("auto.offset.reset","latest");
        kafkaParams.put("security.protocol","SASL_PLAINTEXT");
        kafkaParams.put("sasl.kerberos.service.name","henghe");
        kafkaParams.put("sasl.mechanism","GSSAPI");
        kafkaParams.put("sasl.jaas.config","com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab=\"" + keytabPath + "\" principal=\"user@USER.COM\";");
        kafkaParams.put("enable.auto.commit",false);
        //读kafka
        JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(javaStreamingContext,
                LocationStrategies.PreferConsistent(),
                ConsumerStrategies.Subscribe(topics, kafkaParams));

        //准备hbase所需参数
        Configuration config = HBaseConfiguration.create();
        config.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 30000);
        config.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 30000);

        config.set("hbase.zookeeper.quorum", zkQuorum4);
        config.set("zookeeper.znode.parent", "/hbase1");
        config.set("hadoop.security.authentication", "kerberos");
        config.set("hbase.security.authentication", "kerberos");
        config.set("hbase.master.kerberos.principal", "henghe/_HOST@HENGHE.COM");
        config.set("hbase.thrift.kerberos.principal", "henghe/_HOST@HENGHE.COM");
        config.set("hbase.regionserver.kerberos.principal", "henghe/_HOST@HENGHE.COM");
        config.set("hbase.client.keytab.principal", "henghe@HENGHE.COM");
        config.set("hbase.client.userprovider.class", "org.apache.hadoop.hbase.security.UserProvider");
        config.set("hbase.client.keytab.file", keytabPath);
        // 此处需要填写表名
        config.set(TableOutputFormat.OUTPUT_TABLE,"[tableName]");
        JobConf jobConf = new JobConf(config);
        Job job = null;
        try {
            job = Job.getInstance(jobConf);
        } catch (IOException e) {
            e.printStackTrace();
        }
        job.setOutputFormatClass(org.apache.hadoop.hbase.mapreduce.TableOutputFormat.class);
        job.setOutputKeyClass(ImmutableBytesWritable.class);
        job.setOutputValueClass(Put.class);

        Job finalJob = job;
        // 写hbase
        stream.foreachRDD(rdd -> {
            if (rdd.count()>0){
                JavaRDD<Put> putJavaRDD = rdd.map(cr -> {
                    String mes = cr.value();
                    //  此处需要rowkey
                    Put put = new Put(Bytes.toBytes(mes));
                    // 此处分别需要 列族名,列名,和数据
                    put.addColumn(Bytes.toBytes("[familyName]"), Bytes.toBytes("[colName]"), Bytes.toBytes(mes));
                    return put;
                });
                JavaPairRDD<ImmutableBytesWritable, Put> javaPairRDD = putJavaRDD
                        .mapToPair((PairFunction<Put, ImmutableBytesWritable, Put>) p -> new Tuple2<>(new ImmutableBytesWritable(), p));
                javaPairRDD.saveAsNewAPIHadoopDataset(finalJob.getConfiguration());
                System.out.println("ok......");
            }
        });
    }
}
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-04-30 08:47:05  更:2022-04-30 08:50:06 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 9:54:13-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码