您现在的位置是:主页 > news > 免费pc 微网站模板/最近三天的新闻大事摘抄

免费pc 微网站模板/最近三天的新闻大事摘抄

admin2025/5/2 18:33:57news

简介免费pc 微网站模板,最近三天的新闻大事摘抄,网站导航栏自适应显示,seo系统培训课程RDD(Resulient Distributed Databases, 弹性分布式数据集)代表可并行操作元素的不可变分区集合。严格来讲,RDD的转换及DAG的构成并不属于调度系统的内容,但是RDD却是调度系统操作的主要对象,因此有必要对RDD进行详细的介绍。一、为什么需要RD…

免费pc 微网站模板,最近三天的新闻大事摘抄,网站导航栏自适应显示,seo系统培训课程RDD(Resulient Distributed Databases, 弹性分布式数据集)代表可并行操作元素的不可变分区集合。严格来讲,RDD的转换及DAG的构成并不属于调度系统的内容,但是RDD却是调度系统操作的主要对象,因此有必要对RDD进行详细的介绍。一、为什么需要RD…

RDD(Resulient Distributed Databases, 弹性分布式数据集)代表可并行操作元素的不可变分区集合。严格来讲,RDD的转换及DAG的构成并不属于调度系统的内容,但是RDD却是调度系统操作的主要对象,因此有必要对RDD进行详细的介绍。

一、为什么需要RDD

1. 数据处理模型

RDD是一个容错的、并行的数据结构,可以控制将数据存储到磁盘或者内存,能够获取数据的分区。RDD提供了一组类似于Scala的操作,比如map、flatMap、filter、reduceByKey、join等,这些操作实际上是对RDD进行转换(transformation)。此外,RDD还提供了collect、foreach、count、reduce、countByKey等操作完成数据计算的动作(action)。这里的转换和动作是一种惰性机制。

通常数据处理模型包括迭代计算、关系查询、MapReduce、流式处理等。Hadoop采用MapReduce模型,storm采用流式处理模型,而Spark借助RDD实现了以上的所有模型。

2. 依赖划分原则

一个RDD包含一个或者多个分区,每个分区实际是一个数据集合的片段。在构建DAG的过程中,会将RDD用依赖关系串联起来。每个RDD都有其依赖(除了最顶级的RDD的依赖是空列表),这些依赖被划分为宽依赖和窄依赖。窄依赖会被划分到一个stage中,这样他们就能以管道(pipeline)的方式进行迭代(流水线优化宽依赖由于所依赖的分区Task不止一个,所以往往需要跨结点传输数据。从容错角度讲,他们恢复计算结果的方式不同。

3. 数据处理效率

RDD的计算过程允许在多个结点并发执行。如果数据量很大,可以适当增加分区数量,这种根据硬件条件对并发数量的控制,能更好地利用各种资源,也能有效提高Spark的数据处理效率。

4. 容错处理

传统关系型数据库往往采用日志的方式来容错,数据以来往往依赖于重新执行日志。Hadoop为了避免单机故障概率较高的问题,通常讲数据备份到其他机器来容错。RDD本身是一个不可变的(immutable)数据集,当某个Worker结点上的Task失败时,可以利用DAG重新调度计算这些失败的Task(执行已成功的Task可以从CheckPoint中读取,而不用重新计算)。在流式计算的场景中,Spark需要记录日志和CheckPoint,以便利用日志和CheckPoint对数据进行恢复。

二、RDD实现的初次分析

本文只对RDD中与调度系统相关的API进行分析。

抽象类RDD定义了所有RDD的规范,我们从RDD的属性开始,逐步了解RDD的实现。

  • sc:即SparkContext。_sc由@transient修饰,所以此属性不会被序列化。
  • deps:构造器参数之一,是Dependency的序列,用于存储当前RDD的依赖。RDD的子类在实现时不一定会传递此参数。由于deps由@transient修饰,所以此属性不会被序列化。
  • partitioner:当前RDD的分区计算器。partitioner由@transient修饰,所以此属性不会被序列化。
  • id:当前RDD的唯一身份标识。此属性通过调用SparkContext的nextRddId属性生成。
  • name:RDD的名称。name由@transient修饰,所以此属性不会被序列化。
  • dependencies_:与deps相同,但是可以被序列化。
  • partitions_:存储当前RDD的所有分区的数组。partitions_由@transient修饰,所以此属性不会被序列化。
  • storageLevel:当前RDD的存储级别。
  • creationSite:创建当前RDD的用户代码。creationSite由@transient修饰,所以此属性不会被序列化。
  • scope:当前RDD的操作作用域。scope由@transient修饰,所以此属性不会被序列化。
  • checkpointData:当前RDD的检查点数据。
  • checkpointAllMarkedAncestors:是否对所有标记了需要保存检查点的祖先保存检查点。
  • doCheckpointCalled:是否已经调用了doCheckpoint方法设置检查点。此属性可以阻止对RDD多次设置检查点。

RDD采用了模板方法的模式设计,抽象类RDD中定义了模板方法及一些未实现的接口,这些接口讲需要RDD的各个子类分别实现。

  • compute:对RDD的分区进行计算。
  • getPartitions:获取当前RDD的所有分区。
  • getDependencies:获取当前RDD的所有依赖。
  • getPreferredLocations:获取某一分区的偏好位置。

RDD中除了定义了以上接口外,还实现了一些模板方法。

1. partitions

partitions方法用于获取RDD的分区数组。

final def partitions: Array[Partition] = {checkpointRDD.map(_.partitions).getOrElse {  // 从CheckPoint中查找if (partitions_ == null) {stateLock.synchronized {if (partitions_ == null) {partitions_ = getPartitions  // 调用getPartitions方法获取partitions_.zipWithIndex.foreach { case (partition, index) =>require(partition.index == index,s"partitions($index).partition == ${partition.index}, but it should equal $index")}}}}partitions_}}

2. preferredLocations

preferredLocations方法优先调用CheckPoint中保存的RDD的getPreferredLocations方法获取指定分区的偏好位置,当没有保存CheckPoint时调用自身的getPreferredLocations方法获取指定分区的偏好位置。

final def preferredLocations(split: Partition): Seq[String] = {// 优先调用CheckPoint中保存的RDD的getPreferredLocations方法获取指定分区的偏好位置checkpointRDD.map(_.getPreferredLocations(split)).getOrElse {getPreferredLocations(split)  // 调用自身的getPreferredLocations方法获取指定分区的偏好位置}}

3. dependencies

dependencies方法用于获取当前RDD所有依赖序列。

final def dependencies: Seq[Dependency[_]] = {checkpointRDD.map(r => List(new OneToOneDependency(r))).getOrElse {if (dependencies_ == null) {stateLock.synchronized {if (dependencies_ == null) {dependencies_ = getDependencies}}}dependencies_}}
  • 从CheckPoint中获取RDD并将这些RDD封装为OneToOneDependency列表。如果从CheckPoint中获取到RDD的依赖,则返回RDD的依赖,否则进入下一步。
  • 如果dependencies_等于null,那么调用子类实现的getDependencies方法获取当前RDD的依赖后赋予dependencies,最后返回dependencies_。

4. 其他方法

除了以上的模板方法,RDD还实现了以下的一些方法。

  • 1)context:返回_sc。
  • 2)getStorageLevel:返回当前RDD的StorageLevel。
  • 3)getNarrowAncestors:用于获取RDD的祖先依赖中属于窄依赖的RDD序列。

三、RDD依赖

DAG中的RDD之间存在着依赖关系。换言之,正是RDD之间的依赖关系构建了由RDD所组成的DAG。Spark使用Dependency来表示RDD之间的依赖关系。Dependency的定义如下(抽象类)。

@DeveloperApi
abstract class Dependency[T] extends Serializable {def rdd: RDD[T]
}

抽象类Dependency只定义了一个名叫rdd的方法,此方法返回当前依赖的RDD。

Dependency分为NarrowDependency和ShuffleDependency两种依赖,下面对它们分别介绍。

1. 窄依赖

如果RDD与上游RDD的分区是一对一的关系,那么RDD和其上游RDD之间的依赖关系属于窄依赖。NarrowDependency继承了Dependency,以表示窄依赖。NarrowDependency的定义如下:

@DeveloperApi
abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {def getParents(partitionId: Int): Seq[Int]override def rdd: RDD[T] = _rdd
}

NarrowDependency定义了一个类型为RDD的构造器参数_rdd,NarrowDependency重写了Dependency的rdd方法,让其返回_rdd。NarrowDependency还定义了一个获取某一分区的所有父级别分区序列的getParents方法。NarrowDependency一共有两个子类,它们的实现见代码如下。

@DeveloperApi
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {override def getParents(partitionId: Int): List[Int] = List(partitionId)
}@DeveloperApi
class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)extends NarrowDependency[T](rdd) {override def getParents(partitionId: Int): List[Int] = {if (partitionId >= outStart &amp;&amp; partitionId < outStart + length) {List(partitionId - outStart + inStart)} else {Nil}}
}

根据代码清单,OneToOneDependency重写的getParents方法告诉我们,子RDD的分区与依赖的父RDD分区相同。OneToOneDependency可以用图7.2更形象地说明。

spark源码阅读7-调度系统2(RDD详解)
图7-2:OneToOneDependency的依赖示意图

根据代码清单,RangeDependency重写了Dependency的getParents方法,其实现告诉我们RangeDependency的分区是一对一的, 且索引为partitionId的子RDD分区与索引为partitionId - outStart + inStart的父RDD分区相对应(outStart代表子RDD的分区范围起始值,inStart代表父RDD的分区范围起始值)。RangeDependency可以用图7-3更形象的说明。

spark源码阅读7-调度系统2(RDD详解)
图7-3:RangeDependency的依赖示意图

2. Shuffle依赖

RDD与上游RDD的分区如果不是一对一的关系,或者RDD的分区依赖于上游RDD的多个分区,那么这种依赖关系就叫做Shuffle依赖(ShuffleDependency)。ShuffleDependency的实现代码如下:

<pre class="wp-block-syntaxhighlighter-code">@DeveloperApi
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](@transient private val _rdd: RDD[_ <: Product2[K, V]],val partitioner: Partitioner,val serializer: Serializer = SparkEnv.get.serializer,val keyOrdering: Option[Ordering[K]] = None,val aggregator: Option[Aggregator[K, V, C]] = None,val mapSideCombine: Boolean = false,val shuffleWriterProcessor: ShuffleWriteProcessor = new ShuffleWriteProcessor)extends Dependency[Product2[K, V]] {if (mapSideCombine) {require(aggregator.isDefined, "Map-side combine without Aggregator specified!")}override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]]private[<a href="http://san1.gz01.bdysite.com/?tag=spark" title="查看与 spark 相关的文章" target="_blank">spark</a>] val keyClassName: String = reflect.classTag[K].runtimeClass.getNameprivate[spark] val valueClassName: String = reflect.classTag[V].runtimeClass.getName// Note: It's possible that the combiner class tag is null, if the combineByKey// methods in PairRDDFunctions are used instead of combineByKeyWithClassTag.private[spark] val combinerClassName: Option[String] =Option(reflect.classTag[C]).map(_.runtimeClass.getName)val shuffleId: Int = _rdd.context.newShuffleId()val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(shuffleId, this)_rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))_rdd.sparkContext.shuffleDriverComponents.registerShuffle(shuffleId)
}</pre>
  • 1)_rdd:泛型要求必须是Product2[K, V]及其子类的RDD。
  • 2)partitioner:分区计算器Partitioner。Partitioner将在下一小节详细介绍。
  • 3)serializer:SparkEnv中创建的serializer,即org.apache.spark.serializer.JavaSerializer。
  • 4)keyOrdering:按照K进行排序的scala.math.Ordering/的实现类。
  • 5)aggregator:对map任务的输出数据进行聚合的聚合器
  • 6)mapSideCombine:是否在map端进行合并,默认为false。
  • 7)keyClassName:Key的类名。
  • 8)valueClassName:Value的类名。
  • 9)combinerClassName:结合器C的类名。
  • 10)shuffleId:当前ShuffleDependency的身份标识。
  • 11)shuffleHandle:当前ShuffleDependency的处理器。

此外,ShuffleDependency还重写了父类Dependency的rdd方法,其实现将_rdd转换为RDD[Product2[K, V]]后返回。ShuffleDependency在构造的过程中还将自己注册到SparkContext的ContextCleaner中。

四、分区计算器Partitioner

RDD之间的依赖关系如果是Shuffle依赖,那么上游RDD该如何确定每个分区的输出将交由下游RDD的哪些分区呢?或者下游RDD的各个分区将具体依赖于上游RDD的哪些分区呢?Spark提供了分区计算器来解决这个问题。ShuffleDependency的partitioner属性的类型是Partitioner,抽象类Partitioner定义了分区计算器的接口规范,ShuffleDependency的分区取决于Partitioner的具体实现。Partitioner的定义如下(抽象类):

abstract class Partitioner extends Serializable {def numPartitions: Int  // 用于获取分区数量def getPartition(key: Any): Int  // 将输入的key映射到下游RDD的从0到numPartitions-1这一范围中的某一个分区
}

Partitioner有很多具体的实现类,它们的继承体系如图1所示。

spark源码阅读7-调度系统2(RDD详解)
图7-4:Patitioner的继承体系

Spark除图7-4中列出的Partitioner子类,还有很多Partitioner的匿名实现类,这里就不一一介绍了。本书以HashPartitioner(哈希分区计算器)为例,详细介绍Partitioner的实现。之所以选择对HashPartitioner的实现进行分析,一方面是由于其实现简洁明了,读者更容易理解;另一方面通过介绍HashPartitioner已经足够达到本书的目的。

HashPartitioner的实现代码如下:

class HashPartitioner(partitions: Int) extends Partitioner {// 增加了一个名为partitions的构造器参数作为分区数require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")def numPartitions: Int = partitions  // 返回分区数partitionsdef getPartition(key: Any): Int = key match { // 计算出下游RDD的各个分区将具体处理哪些keycase null => 0case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)  // 对key的hashCode和numPartitions进行取模运算,得到key对应的分区索引。}override def equals(other: Any): Boolean = other match {case h: HashPartitioner =>h.numPartitions == numPartitionscase _ =>false}override def hashCode: Int = numPartitions
}

由于上游RDD所处理的key的hash值在取模后可能产生数据倾斜,所以HashPartitioner并不是一个均衡的分区计算器。

根据HashPartitioner的实现,我们知道ShuffleDependency中的分区依赖关系不再是一对一的,而是取决于key,并且当前RDD的某个分区将可能依赖于ShuffleDependency的RDD的任何一个分区。经过以上分析,ShuffleDependency采用HashPartitioner后的分区依赖可以用图7-5来表示。

spark源码阅读7-调度系统2(RDD详解)
图7-5:ShuffleDependency的依赖示意图

五、RDDInfo

RDDInfo用于描述RDD的信息,RDDInfo提供的信息如下:

  • id:RDD的id。
  • name:RDD的名称。
  • numPartitions:RDD的分区数量。
  • storageLevel:RDD的存储级别(即StorageLevel)。
  • parentIds:RDD的父亲RDD的id序列。这说明一个RDD会有零到多个父RDD。
  • callSite:RDD的用户调用栈信息。
  • scope:RDD的作用域范围。scope的类型为RDDOperationScope,每一个RDD都有一个RDDOperationScope。RDDOperationScope与Stage或Job之间并无特殊关系,一个RDDOperationScope可以存在于一个Stage内,也可以跨越多个Job。
  • numCachedPartitions:缓存的分区数量。
  • memSize:使用的内存大小。
  • diskSize:使用的磁盘大小。
  • externalBlockStoreSize:Block存储在外部的大小(指不同结点)。

RDD还提供了以下的方法:

1. isCached

是否已经缓存。

def isCached: Boolean = (memSize + diskSize > 0) &amp;&amp; numCachedPartitions > 0

2. compare

由于RDDInfo继承了Ordered,所以重写了compare方法用于排序。compare的代码如下:

override def compare(that: RDDInfo): Int = {this.id - that.id  // 根据id的大小进行排序}

3. fromRdd

用于从RDD构建出相应的RDDInfo,其实现代码如下:

private[spark] object RDDInfo {def fromRdd(rdd: RDD[_]): RDDInfo = {val rddName = Option(rdd.name).getOrElse(Utils.getFormattedClassName(rdd))  // 获取rddName属性val parentIds = rdd.dependencies.map(_.rdd.id)  // 获取当前RDD依赖的所有父RDD的身份标识作为RDDInfo的parentIds属性new RDDInfo(rdd.id, rddName, rdd.partitions.length,rdd.getStorageLevel, parentIds, rdd.creationSite.shortForm, rdd.scope)  // 创建RDDInfo对象并返回}
}