Spark 与 OSS
前言
因为公司使用的是阿里云全家桶,OSS基本上就是唯一的神,取代了HDFS成为统一文件系统,n多的数据也通过阿里亲儿子Flink接到了数仓,但是因为上报方偶尔一波离谱数据,似乎是在做压测? 而我们Flink开发同学图省事没有做错误处理,导致flink大姨妈的时候,重启消费数据重复(这里也看得出来所谓的Exact-Once 也不是完全端到端精确,假如sink跟source没有特殊处理的话还是有重复的). spark: 你们要抓的是鲁迅flink ,关我周树人spark 什么事?,流批不分家嘛,实时任务种下的恶果,擅长批处理的spark从Oss补一下数据怎么了?
spark 获取oss文件系统
补数据是一种常见场景,基本上read文件的时候下一个路径通配就行,但是免不了一些奇怪的场景需要用上,还是给一下代码吧.
scala spark oss
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.appName("spark oss")
.getOrCreate()
val conf = new Configuration()
conf.set("fs.oss.accessKeyId","xxx")
conf.set("fs.oss.accessKeySecret","xxx")
conf.set("fs.oss.endpoint","xxx")
conf.set("fs.oss.impl","org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem")
val path = new Path("oss://path_to_list")
val fs = FileSystem.get(path.toUri, conf)
fs.exists(path)
pyspark oss
from pyspark.sql import SparkSession
spark = Spark.builder.appName("spark oss").getOrCreate()
sc = spark.sparkContext
conf = sc._jsc.hadoopConfiguration()
conf.set("fs.oss.accessKeyId","xxx")
conf.set("fs.oss.accessKeySecret","xxx")
conf.set("fs.oss.endpoint","xxx")
conf.set("fs.oss.impl","org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem")
path = sc._jvm.org.apache.hadoop.fs.Path("oss://xxxx")
fs = sc._jvm.org.apache.hadoop.fs.FileSystem.get(path.toUri(),conf)
exist = fs.exists(path)
PathFilter,不只是oss
某些场景下,可能还要让你的spark不读某些个指定文件或者只读某些个满足某种条件的文件,这个时候read里边的通配只有*,{}这种,并不是完完全全的正则,所以可能需要文件过滤来满足, PathFilter不只是oss,hdfs乃至file也好使,有用就上!
import org.apache.hadoop.fs.{Path,PathFilter}
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.appName("spark path filter")
.getOrCreate()
val sc = spark.sparkContext
sc.hadoopConfiguration.setClass("mapreduce.input.pathFilter.class",classOf[MyFilter],classOf[PathFilter])
class MyFilter extends PathFilter{
override def accept(path:Path):Boolean = !path.getName.endsWith(".tmp")
}
|