搜文章
推荐 原创 视频 Java开发 iOS开发 前端开发 JavaScript开发 Android开发 PHP开发 数据库 开发工具 Python开发 Kotlin开发 Ruby开发 .NET开发 服务器运维 开放平台 架构师 大数据 云计算 人工智能 开发语言 其它开发
Lambda在线 > 张小小凡 > Spark复习-RDD

Spark复习-RDD

张小小凡 2020-12-21

1.RDD的介绍

RDD叫做弹性分布式数据集,但是他本身并不保存数据。代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。

  • 弹性
    • 存储的弹性:内存与磁盘的自动切换
    • 容错的弹性:数据丢失可以自动恢复
    • 计算的弹性:计算出错重试机制
    • 分片的弹性:可根据需要重新分片
  • 分布式:数据存储在大数据集群不同的节点上
  • 数据集:RDD封装了计算逻辑,不包含数据
  • 可分区并行计算

2.核心属性

  • 分区列表:RDD 数据结构中存在分区列表,用于执行任务时并行计算,是实现分布式计算的重要属性。
  • 分区计算函数:对每个分区进行相同的逻辑处理
  • RDD之间的依赖关系也叫血缘关系,这个非常重要后续再详细介绍
  • 分区器(可选)
  • 首选位置(可选)

2.1分区器

类似于kafka中的分区,如图中把数据分为1,2 和3,4。也可以分成1,3和2,4。如果数据是kv形的,可以根据k来进行分区。需要自己自定义

2.2首选位置

如图封装好的Task1为什么要发给Executor1为什么不发给Executor2呢。这里有一个首选位置的规则。因为我们读取的数据是分布式的存在各个节点上的。如果1,2数据在Executor所在的节点上,那么把task1发给Executor1就是最优的选择。省去了把数据发给Executor2所在节点产生的网络io的消耗。

当然某些情况下也会把task1发给Executor2。比如Executor1所在节点没有运行资源了,被别的任务占Task1会等待一段时间,如果超时会进行第二优选择,发给离Executor1所在节点拓扑距离最近的节点执行。

RDD读取数据处理的流程。不管是textfile读取文件,还是sparksql读取数据库,还是sparkstreaming读取流式数据,最后都会转换成RDD执行。

3.RDD数据切分的规则

3.1 rdd从内存中创建时

源码:

    def positions(length: Long, numSlices: Int): Iterator[(IntInt)] = {
      (0 until numSlices).iterator.map { i =>
        val start = ((i * length) / numSlices).toInt
        val end = (((i + 1) * length) / numSlices).toInt
        (start, end)
      }
    }

python实现:

a=[1,2,3,4,5]

def slice(list,numSlices):
    result=[]
    length=len(list)# 数据集的个数
    
    for i in range(0,numSlices):
        start=int(i*length/numSlices)
        end=int((i+1)*length/numSlices)
        result.append((start,end))
    return result

ret=slice(a,3)
结果为:[(01), (13), (35)]

3.2 rdd从文件中创建时

贴一部分代码吧:

long blockSize = file.getBlockSize();
long splitSize = this.computeSplitSize(goalSize, minSize, blockSize);
long bytesRemaining;
String[][] splitHosts;
for(bytesRemaining = length; (double)bytesRemaining / (double)splitSize > 1.1D; bytesRemaining -= splitSize) {
    splitHosts = this.getSplitHostsAndCachedHosts(blkLocations, length - bytesRemaining, splitSize, clusterMap);
    splits.add(this.makeSplit(path, length - bytesRemaining, splitSize, splitHosts[0], splitHosts[1]));
}
if (bytesRemaining != 0L) {
    splitHosts = this.getSplitHostsAndCachedHosts(blkLocations, length - bytesRemaining, bytesRemaining, clusterMap);
    splits.add(this.makeSplit(path, length - bytesRemaining, bytesRemaining, splitHosts[0], splitHosts[1]));
}

当我们使用sc.textFile创建rdd时,底层调用的其实是hadoop读取文件的形式。

首先计算文件的总字节数size,用size/我们希望的分区数,当不能整除时。如果余数<平均分区字节数*0.1时,不会再增加分区直接合并到最后一个分区,这样做是防止产生小文件。当>0.1时,会增加一个分区。所以实际的分区数可能是我们指定的分区数+1。

hadoop读取文件是一行一行读的,并且索引不会重复读取。所以会出现有的分区没有数据的情况。

4.总结

下回分享RDD的各种算子,也是开发中最重要的部分。

转载美三代  点赞富一生



版权声明:本站内容全部来自于腾讯微信公众号,属第三方自助推荐收录。《Spark复习-RDD》的版权归原作者「张小小凡」所有,文章言论观点不代表Lambda在线的观点, Lambda在线不承担任何法律责任。如需删除可联系QQ:516101458

文章来源: 阅读原文

相关阅读

关注张小小凡微信公众号

张小小凡微信公众号:flinkoldbird

张小小凡

手机扫描上方二维码即可关注张小小凡微信公众号

张小小凡最新文章

精品公众号随机推荐