IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Spark 与 OSS -> 正文阅读

[大数据]Spark 与 OSS

Spark 与 OSS

前言

因为公司使用的是阿里云全家桶,OSS基本上就是唯一的神,取代了HDFS成为统一文件系统,n多的数据也通过阿里亲儿子Flink接到了数仓,但是因为上报方偶尔一波离谱数据,似乎是在做压测?
而我们Flink开发同学图省事没有做错误处理,导致flink大姨妈的时候,重启消费数据重复(这里也看得出来所谓的Exact-Once也不是完全端到端精确,假如sink跟source没有特殊处理的话还是有重复的).
spark: 你们要抓的是鲁迅flink,关我周树人spark什么事?,流批不分家嘛,实时任务种下的恶果,擅长批处理的spark从Oss补一下数据怎么了?

spark 获取oss文件系统

补数据是一种常见场景,基本上read文件的时候下一个路径通配就行,但是免不了一些奇怪的场景需要用上,还是给一下代码吧.

scala spark oss

import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
				.appName("spark oss")
				.getOrCreate()
val conf = new Configuration()
conf.set("fs.oss.accessKeyId","xxx")
conf.set("fs.oss.accessKeySecret","xxx")
conf.set("fs.oss.endpoint","xxx")
//fs.oss.impl很关键,否则会报错No module named OSSFileStystem
conf.set("fs.oss.impl","org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem") 
val path = new Path("oss://path_to_list")
// 这个fs就是FileSystem对象,就像Java中的FileSystem那么用就可以
val fs = FileSystem.get(path.toUri, conf)
//判断文件是否存在
fs.exists(path)

pyspark oss

from pyspark.sql import SparkSession

spark = Spark.builder.appName("spark oss").getOrCreate()
sc = spark.sparkContext
conf = sc._jsc.hadoopConfiguration()
conf.set("fs.oss.accessKeyId","xxx")
conf.set("fs.oss.accessKeySecret","xxx")
conf.set("fs.oss.endpoint","xxx")
conf.set("fs.oss.impl","org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem")

path = sc._jvm.org.apache.hadoop.fs.Path("oss://xxxx")
fs = sc._jvm.org.apache.hadoop.fs.FileSystem.get(path.toUri(),conf)
exist = fs.exists(path)

PathFilter,不只是oss

某些场景下,可能还要让你的spark不读某些个指定文件或者只读某些个满足某种条件的文件,这个时候read里边的通配只有*,{}这种,并不是完完全全的正则,所以可能需要文件过滤来满足,
PathFilter不只是oss,hdfs乃至file也好使,有用就上!

import org.apache.hadoop.fs.{Path,PathFilter}
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
				.appName("spark path filter")
				.getOrCreate()
val sc = spark.sparkContext
sc.hadoopConfiguration.setClass("mapreduce.input.pathFilter.class",classOf[MyFilter],classOf[PathFilter])

class MyFilter extends PathFilter{
	//可以让满足条件的文件被accept,比如不读.tmp结尾的文件.
  override def accept(path:Path):Boolean = !path.getName.endsWith(".tmp")
}

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-21 15:32:29  更:2021-08-21 15:34:26 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/23 13:27:47-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码