一.RDD中数据来源
2个地方:本地集合或外部数据源
sc.parallelize(本地集合,分区数) sc.makeRDD(本地集合,分区数) 底层是parallelizesc.textFile(HDFS/文件夹,分区数) 以行为单位读取数据sc.wholeTextFiles(HDFS/文件夹,分区数) 以文件为单位,专门读取小文件,结果是元组,第一个元素是文件路径,第二个元素是文件内容
RDD本身是不保存数据的,只保存计算逻辑
二.读取内存数据分区规则
数据可以按照并行度的设定进行数据的分区操作,数据分区规则的 i代表分区的索引,从0开始。length代表本地数据的个数。左闭右开
def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
(0 until numSlices).iterator.map { i =>
val start = ((i * length) / numSlices).toInt
val end = (((i + 1) * length) / numSlices).toInt
(start, end)
}
}
示例
object RddNoKey {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("Spark_RDD").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf);
val list = ListBuffer[Int]()
list.append(1,2,3,4,5)
val nums: RDD[Int] = sc.parallelize(list,3)
nums.saveAsTextFile("./output")
}
}
集合中有5个元素,length = 5,设置了三个分区那么就会分为三个分区
对于分区0,由计算规则,得 [0,1),所以分区0存放下标为0的元素1 对于分区1,由计算规则,得 [1,3),所以分区1存放2和3 对于分区2,由计算规则,的 [3,5),所以分区2存放4和5
三.读取文件数据分区规则
4.1 分区数量的计算规则
先说结论:产生的分区数和minPartitions相等或者minPartitions+1
什么时候加1什么时候相等,我们看个例子
我们读取一个文件时,可以设定一个最小分区数minPartitions = 5,不设置的话默认不会超过2
以文件为单位,看文件有多少个字节! 那么62/ 5 = 12(Byte)…2 也就说每个分区理论应该放12个字节(标准分区)的数据,但是还余下了2个字节,这时候要根据1.1规则 进行判断,如果剩余的分区大于标准分区的10%,则成为一个新的分区,在这里 2 除以 12 = 0.167,所以会产生新的分区
所以,产生的分区数和minPartitions相等或者minPartitions+1 ,所以这里应该是5+1=6个分区
4.2 数据分配到哪个分区规则
先记住两个结论 1.文件中的行是不可分割的单位 2.字节对应偏移量
上述中,产生了6个分区,每个分区的偏移量范围,注意:以偏移量12为例,偏移量12分区0可以读,分区1也可以读!
分区0 0-12 =》 [0,12] 分区1 12-24 =》[12,24] 分区2 24-36 分区3 36-48 分区4 48-60 分区5 60-64
每个分区理论应该是12个字节,也就是分区0应该放0-12偏移量单位的数据,分区1放12-24偏移量的数据,分区3放24-36偏移量的数据…
偏移量是什么?
如图,一个字符偏移量为1,第一行3个zhang和两个空格是17偏移量,但是末尾会有两个换行符偏移量为2(我们看不到),所以,第一行偏移量从0开始,到18结束
第一行有0-18偏移量,3个zhang+两个空格+2个默认的换行符,超出了理论偏移量,但是行不可分割,所以分区0放了第一行
分区1原本读12-24偏移量的数据,但是12-19已经被读了,所以只能读20-24的了,所以分区1放了liu和hao,hao的首偏移量为24,所以可以读到
分区2 读24-36,所以是wang和song song 分区3是36-48,所以是kun kun
分区4是48-60,所以是li song
分区5无数据
|