IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Spark三种连接方式 -> 正文阅读

[大数据]Spark三种连接方式

Spark三种连接方式

1、Spark core

val sparkConf: SparkConf = new SparkConf().setMaster("local").setAppName("name")
    val sparkContext = new SparkContext(sparkConf)
	//方式一
    val value: RDD[String] = sparkContext.textFile("src/main/resources/pvuvdata")
	//方式二
	val value: RDD[String] = sparkContext.parallelize(arr)

	//写出
	value.foreach(println)
	//关闭
	sparkContext.stop()

2、spark SQL

val sparkSession: SparkSession = SparkSession.builder().master("local").appName("hello01SparkSql").getOrCreate()
    //方式一、文本读入
val value: RDD[String] = sparkSession.sparkContext.textFile("src/main/resources/emp.txt")

//方式二 指定格式 读入 
val dFt1: DataFrame = sparkSession.read.format("格式").load("Path")
//方式三,规定格式读入 
 val dFt2: Dataset[Row] = sparkSession.read.格式("Path").as("Class类")

//写出到其他地方
df.show()
df.write().mode(SaveMode.Overwrite).format("格式").save("path");
df.write().mode(SaveMode.Overwrite).格式("path");

//关闭
sparkSession.stop()

2.1. 普通文本

//读取
sparkSession.sparkContext.textFile("src/main/resources/emp.txt")
//写出
empSal.saveAsTextFile("src/main/resources/t_emp_sal.txt")

2.2. json

  def main(args: Array[String]): Unit = {
    val sparkSession: SparkSession = SparkSession.builder().master("local").appName("hello01_sql").getOrCreate()
    //方式一:读取
    val dFt1: DataFrame = sparkSession.read.json("src/main/resources/t_emp1.txt").as("Emp")
    //方式二:读取
    //val dFt2: DataFrame = sparkSession.read.format("json").load("src/main/resources/t_emp.txt")

    //写出
    //dFt1.write.mode(SaveMode.Overwrite).format("json").save("src/main/resources/t_emp1.txt")
    //写出 方式二
    dFt1.write.mode(SaveMode.Overwrite).json("src/main/resources/t_emp.txt")

    dFt1.show()
    //dFt2.show()

  }

2.3. parquet

  def main(args: Array[String]): Unit = {
    val sparkSession: SparkSession = SparkSession.builder().master("local").appName("sql04_parquet").getOrCreate()
    //读入 方式一
    //val dFt1: DataFrame = sparkSession.read.format("parquet").load("src/main/resources/t_emp.txt")
    //读入 方式二
    val dFt2: Dataset[Row] = sparkSession.read.parquet("src/main/resources/t_emp.txt").as("Emp")

    //写出 方式一
    //dFt2.write.mode(SaveMode.Overwrite).format("parquet").save("src/main/resources/t_emp1.txt")
    //写出 方式二
    dFt2.write.mode(SaveMode.Overwrite).parquet("src/main/resources/t_emp1.txt")

    dFt2.show()

  }

2.4. jdbc

  def main(args: Array[String]): Unit = {
    //创建SQL环境
    val sparkSession: SparkSession = SparkSession.builder().master("local").appName("sql04_parquet").getOrCreate()
    //数据库参数
    val map = new mutable.HashMap[String, String]()
    map.put("url", "jdbc:mysql://localhost:3306/java46?useSSL=false&serverTimezone=UTC&characterEncoding=utf8")
    map.put("driver", "com.mysql.cj.jdbc.Driver")
    map.put("user", "root")
    map.put("password", "root")
    map.put("dbtable", "emp")
    //读取JDBC数据 方式一
    val dFt1: DataFrame = sparkSession.read.format("jdbc").options(map).load()

    //数据库参数
    val properties = new Properties()
    properties.setProperty("driver", "com.mysql.cj.jdbc.Driver")
    properties.setProperty("user", "root")
    properties.setProperty("password", "root")
    //读取JDBC数据 方式二
    val dFt2: DataFrame = sparkSession.read
      .jdbc("jdbc:mysql://localhost:3306/java46?useSSL=false&serverTimezone=UTC&characterEncoding=utf8", "emp", properties)

    //写入JDBC数据 方式一
    dFt2.write.mode(SaveMode.Overwrite)
      .jdbc("jdbc:mysql://localhost:3306/java46?serverTimezone=UTC&characterEncoding=utf8&useSSL=false","t_emp",properties)

    //写入JDBC数据 方式二
    map.put("dbtable", "t_emp3")
    dFt1.write.mode(SaveMode.Overwrite).format("jdbc")
      .options(map).save("jdbc:mysql://localhost:3306/java46?serverTimezone=UTC&characterEncoding=utf8&useSSL=false")

    dFt1.show()
    dFt2.show()

}

2.5. hive

  • 拷贝配置文件,从linux上拷贝hadoop、hive配置
    • hdfs-site.xml
    • core-site.xml
    • hive-site.xml
package com.yjxxtimport org.apache.spark.sql.SparkSession
	object HelloSourceHive {
	def main(args: Array[String]): Unit = {
		//搭建环境
		val spark =SparkSession.builder().master("local").appName("HelloSourceHive").enableHiveSupport().getOrCreate()
		//操作数据
		spark.sql("use yjx")
		val dataFrame = spark.sql("select * from t_user")
		dataFrame.show()
		//关闭
		sparkSession.stop()
	}
}

3、spark streaming

def main(args: Array[String]): Unit = {
  //搭建环境
  val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("App")
  //设置参数
  val streamingContext = new StreamingContext(sparkConf, streaming.Seconds(5))
  //获取数据
  val value: ReceiverInputDStream[String] = streamingContext.socketTextStream("localhost", 9999)
  //操作数据
  val dStream: DStream[(String, Int)] = value.flatMap(_.split("\\s")).map((_, 1)).reduceByKey((x, y) => x + y)
  //打印数据
  dStream.print()
    
  //开启服务
  streamingContext.start()
  //等待停止
  streamingContext.awaitTermination()
  //关闭服务
  streamingContext.stop(false)
}

3.2. streaming+kafka

  • 首先要启动zkserver
  • 再启动kafka
  def main(args: Array[String]): Unit = {
    //搭建环境
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]")
      .setAppName("stream05_kafka")
      .set("spark.streaming.stopGracefullyOnShutdown","true")
    //设置streaming封装间隔
    val streamingContext = new StreamingContext(sparkConf, streaming.Seconds(3))

    //kafka配置
    val kafkaPar: Map[String, Object] = Map[String, Object](
      "bootstrap.servers" -> "node01:9092,node02:9092,node03:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "yjx_bigdata",
      "auto.offset.reset" -> "earliest",
      "enable.auto.commit" -> (true: lang.Boolean)
    )
    //创建主题
    val topics: Array[String] = Array("userlog")
    //创建kafka
    val kfDStream: InputDStream[ConsumerRecord[String, String]]
        = KafkaUtils.createDirectStream(streamingContext, PreferConsistent, Subscribe[String, String](topics, kafkaPar))
    //编辑数据
    val result: DStream[(String, Int)] = kfDStream.map(_.value()).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
    //打印数据
    result.print()
    //result.saveAsHadoopFiles("yjx", "txt")

    //开启streaming
    streamingContext.start()
    streamingContext.awaitTermination()
    //关闭streaming
    streamingContext.stop(false)

  }

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-16 11:48:52  更:2021-08-16 11:51:59 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 20:07:54-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码