您现在的位置是:主页 > news > 免费pc 微网站模板/最近三天的新闻大事摘抄
免费pc 微网站模板/最近三天的新闻大事摘抄
admin2025/5/2 18:33:57【news】
简介免费pc 微网站模板,最近三天的新闻大事摘抄,网站导航栏自适应显示,seo系统培训课程RDD(Resulient Distributed Databases, 弹性分布式数据集)代表可并行操作元素的不可变分区集合。严格来讲,RDD的转换及DAG的构成并不属于调度系统的内容,但是RDD却是调度系统操作的主要对象,因此有必要对RDD进行详细的介绍。一、为什么需要RD…
RDD(Resulient Distributed Databases, 弹性分布式数据集)代表可并行操作元素的不可变分区集合。严格来讲,RDD的转换及DAG的构成并不属于调度系统的内容,但是RDD却是调度系统操作的主要对象,因此有必要对RDD进行详细的介绍。
一、为什么需要RDD
1. 数据处理模型
RDD是一个容错的、并行的数据结构,可以控制将数据存储到磁盘或者内存,能够获取数据的分区。RDD提供了一组类似于Scala的操作,比如map、flatMap、filter、reduceByKey、join等,这些操作实际上是对RDD进行转换(transformation)。此外,RDD还提供了collect、foreach、count、reduce、countByKey等操作完成数据计算的动作(action)。这里的转换和动作是一种惰性机制。
通常数据处理模型包括迭代计算、关系查询、MapReduce、流式处理等。Hadoop采用MapReduce模型,storm采用流式处理模型,而Spark借助RDD实现了以上的所有模型。
2. 依赖划分原则
一个RDD包含一个或者多个分区,每个分区实际是一个数据集合的片段。在构建DAG的过程中,会将RDD用依赖关系串联起来。每个RDD都有其依赖(除了最顶级的RDD的依赖是空列表),这些依赖被划分为宽依赖和窄依赖。窄依赖会被划分到一个stage中,这样他们就能以管道(pipeline)的方式进行迭代(流水线优化)宽依赖由于所依赖的分区Task不止一个,所以往往需要跨结点传输数据。从容错角度讲,他们恢复计算结果的方式不同。
3. 数据处理效率
RDD的计算过程允许在多个结点并发执行。如果数据量很大,可以适当增加分区数量,这种根据硬件条件对并发数量的控制,能更好地利用各种资源,也能有效提高Spark的数据处理效率。
4. 容错处理
传统关系型数据库往往采用日志的方式来容错,数据以来往往依赖于重新执行日志。Hadoop为了避免单机故障概率较高的问题,通常讲数据备份到其他机器来容错。RDD本身是一个不可变的(immutable)数据集,当某个Worker结点上的Task失败时,可以利用DAG重新调度计算这些失败的Task(执行已成功的Task可以从CheckPoint中读取,而不用重新计算)。在流式计算的场景中,Spark需要记录日志和CheckPoint,以便利用日志和CheckPoint对数据进行恢复。
二、RDD实现的初次分析
本文只对RDD中与调度系统相关的API进行分析。
抽象类RDD定义了所有RDD的规范,我们从RDD的属性开始,逐步了解RDD的实现。
- sc:即SparkContext。_sc由@transient修饰,所以此属性不会被序列化。
- deps:构造器参数之一,是Dependency的序列,用于存储当前RDD的依赖。RDD的子类在实现时不一定会传递此参数。由于deps由@transient修饰,所以此属性不会被序列化。
- partitioner:当前RDD的分区计算器。partitioner由@transient修饰,所以此属性不会被序列化。
- id:当前RDD的唯一身份标识。此属性通过调用SparkContext的nextRddId属性生成。
- name:RDD的名称。name由@transient修饰,所以此属性不会被序列化。
- dependencies_:与deps相同,但是可以被序列化。
- partitions_:存储当前RDD的所有分区的数组。partitions_由@transient修饰,所以此属性不会被序列化。
- storageLevel:当前RDD的存储级别。
- creationSite:创建当前RDD的用户代码。creationSite由@transient修饰,所以此属性不会被序列化。
- scope:当前RDD的操作作用域。scope由@transient修饰,所以此属性不会被序列化。
- checkpointData:当前RDD的检查点数据。
- checkpointAllMarkedAncestors:是否对所有标记了需要保存检查点的祖先保存检查点。
- doCheckpointCalled:是否已经调用了doCheckpoint方法设置检查点。此属性可以阻止对RDD多次设置检查点。
RDD采用了模板方法的模式设计,抽象类RDD中定义了模板方法及一些未实现的接口,这些接口讲需要RDD的各个子类分别实现。
- compute:对RDD的分区进行计算。
- getPartitions:获取当前RDD的所有分区。
- getDependencies:获取当前RDD的所有依赖。
- getPreferredLocations:获取某一分区的偏好位置。
RDD中除了定义了以上接口外,还实现了一些模板方法。
1. partitions
partitions方法用于获取RDD的分区数组。
final def partitions: Array[Partition] = {checkpointRDD.map(_.partitions).getOrElse { // 从CheckPoint中查找if (partitions_ == null) {stateLock.synchronized {if (partitions_ == null) {partitions_ = getPartitions // 调用getPartitions方法获取partitions_.zipWithIndex.foreach { case (partition, index) =>require(partition.index == index,s"partitions($index).partition == ${partition.index}, but it should equal $index")}}}}partitions_}}
2. preferredLocations
preferredLocations方法优先调用CheckPoint中保存的RDD的getPreferredLocations方法获取指定分区的偏好位置,当没有保存CheckPoint时调用自身的getPreferredLocations方法获取指定分区的偏好位置。
final def preferredLocations(split: Partition): Seq[String] = {// 优先调用CheckPoint中保存的RDD的getPreferredLocations方法获取指定分区的偏好位置checkpointRDD.map(_.getPreferredLocations(split)).getOrElse {getPreferredLocations(split) // 调用自身的getPreferredLocations方法获取指定分区的偏好位置}}
3. dependencies
dependencies方法用于获取当前RDD所有依赖序列。
final def dependencies: Seq[Dependency[_]] = {checkpointRDD.map(r => List(new OneToOneDependency(r))).getOrElse {if (dependencies_ == null) {stateLock.synchronized {if (dependencies_ == null) {dependencies_ = getDependencies}}}dependencies_}}
- 从CheckPoint中获取RDD并将这些RDD封装为OneToOneDependency列表。如果从CheckPoint中获取到RDD的依赖,则返回RDD的依赖,否则进入下一步。
- 如果dependencies_等于null,那么调用子类实现的getDependencies方法获取当前RDD的依赖后赋予dependencies,最后返回dependencies_。
4. 其他方法
除了以上的模板方法,RDD还实现了以下的一些方法。
- 1)context:返回_sc。
- 2)getStorageLevel:返回当前RDD的StorageLevel。
- 3)getNarrowAncestors:用于获取RDD的祖先依赖中属于窄依赖的RDD序列。
三、RDD依赖
DAG中的RDD之间存在着依赖关系。换言之,正是RDD之间的依赖关系构建了由RDD所组成的DAG。Spark使用Dependency来表示RDD之间的依赖关系。Dependency的定义如下(抽象类)。
@DeveloperApi
abstract class Dependency[T] extends Serializable {def rdd: RDD[T]
}
抽象类Dependency只定义了一个名叫rdd的方法,此方法返回当前依赖的RDD。
Dependency分为NarrowDependency和ShuffleDependency两种依赖,下面对它们分别介绍。
1. 窄依赖
如果RDD与上游RDD的分区是一对一的关系,那么RDD和其上游RDD之间的依赖关系属于窄依赖。NarrowDependency继承了Dependency,以表示窄依赖。NarrowDependency的定义如下:
@DeveloperApi
abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {def getParents(partitionId: Int): Seq[Int]override def rdd: RDD[T] = _rdd
}
NarrowDependency定义了一个类型为RDD的构造器参数_rdd,NarrowDependency重写了Dependency的rdd方法,让其返回_rdd。NarrowDependency还定义了一个获取某一分区的所有父级别分区序列的getParents方法。NarrowDependency一共有两个子类,它们的实现见代码如下。
@DeveloperApi
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {override def getParents(partitionId: Int): List[Int] = List(partitionId)
}@DeveloperApi
class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)extends NarrowDependency[T](rdd) {override def getParents(partitionId: Int): List[Int] = {if (partitionId >= outStart && partitionId < outStart + length) {List(partitionId - outStart + inStart)} else {Nil}}
}
根据代码清单,OneToOneDependency重写的getParents方法告诉我们,子RDD的分区与依赖的父RDD分区相同。OneToOneDependency可以用图7.2更形象地说明。
根据代码清单,RangeDependency重写了Dependency的getParents方法,其实现告诉我们RangeDependency的分区是一对一的, 且索引为partitionId的子RDD分区与索引为partitionId - outStart + inStart的父RDD分区相对应(outStart代表子RDD的分区范围起始值,inStart代表父RDD的分区范围起始值)。RangeDependency可以用图7-3更形象的说明。
2. Shuffle依赖
RDD与上游RDD的分区如果不是一对一的关系,或者RDD的分区依赖于上游RDD的多个分区,那么这种依赖关系就叫做Shuffle依赖(ShuffleDependency)。ShuffleDependency的实现代码如下:
<pre class="wp-block-syntaxhighlighter-code">@DeveloperApi
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](@transient private val _rdd: RDD[_ <: Product2[K, V]],val partitioner: Partitioner,val serializer: Serializer = SparkEnv.get.serializer,val keyOrdering: Option[Ordering[K]] = None,val aggregator: Option[Aggregator[K, V, C]] = None,val mapSideCombine: Boolean = false,val shuffleWriterProcessor: ShuffleWriteProcessor = new ShuffleWriteProcessor)extends Dependency[Product2[K, V]] {if (mapSideCombine) {require(aggregator.isDefined, "Map-side combine without Aggregator specified!")}override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]]private[<a href="http://san1.gz01.bdysite.com/?tag=spark" title="查看与 spark 相关的文章" target="_blank">spark</a>] val keyClassName: String = reflect.classTag[K].runtimeClass.getNameprivate[spark] val valueClassName: String = reflect.classTag[V].runtimeClass.getName// Note: It's possible that the combiner class tag is null, if the combineByKey// methods in PairRDDFunctions are used instead of combineByKeyWithClassTag.private[spark] val combinerClassName: Option[String] =Option(reflect.classTag[C]).map(_.runtimeClass.getName)val shuffleId: Int = _rdd.context.newShuffleId()val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(shuffleId, this)_rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))_rdd.sparkContext.shuffleDriverComponents.registerShuffle(shuffleId)
}</pre>
- 1)_rdd:泛型要求必须是Product2[K, V]及其子类的RDD。
- 2)partitioner:分区计算器Partitioner。Partitioner将在下一小节详细介绍。
- 3)serializer:SparkEnv中创建的serializer,即org.apache.spark.serializer.JavaSerializer。
- 4)keyOrdering:按照K进行排序的scala.math.Ordering/的实现类。
- 5)aggregator:对map任务的输出数据进行聚合的聚合器
- 6)mapSideCombine:是否在map端进行合并,默认为false。
- 7)keyClassName:Key的类名。
- 8)valueClassName:Value的类名。
- 9)combinerClassName:结合器C的类名。
- 10)shuffleId:当前ShuffleDependency的身份标识。
- 11)shuffleHandle:当前ShuffleDependency的处理器。
此外,ShuffleDependency还重写了父类Dependency的rdd方法,其实现将_rdd转换为RDD[Product2[K, V]]后返回。ShuffleDependency在构造的过程中还将自己注册到SparkContext的ContextCleaner中。
四、分区计算器Partitioner
RDD之间的依赖关系如果是Shuffle依赖,那么上游RDD该如何确定每个分区的输出将交由下游RDD的哪些分区呢?或者下游RDD的各个分区将具体依赖于上游RDD的哪些分区呢?Spark提供了分区计算器来解决这个问题。ShuffleDependency的partitioner属性的类型是Partitioner,抽象类Partitioner定义了分区计算器的接口规范,ShuffleDependency的分区取决于Partitioner的具体实现。Partitioner的定义如下(抽象类):
abstract class Partitioner extends Serializable {def numPartitions: Int // 用于获取分区数量def getPartition(key: Any): Int // 将输入的key映射到下游RDD的从0到numPartitions-1这一范围中的某一个分区
}
Partitioner有很多具体的实现类,它们的继承体系如图1所示。
Spark除图7-4中列出的Partitioner子类,还有很多Partitioner的匿名实现类,这里就不一一介绍了。本书以HashPartitioner(哈希分区计算器)为例,详细介绍Partitioner的实现。之所以选择对HashPartitioner的实现进行分析,一方面是由于其实现简洁明了,读者更容易理解;另一方面通过介绍HashPartitioner已经足够达到本书的目的。
HashPartitioner的实现代码如下:
class HashPartitioner(partitions: Int) extends Partitioner {// 增加了一个名为partitions的构造器参数作为分区数require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")def numPartitions: Int = partitions // 返回分区数partitionsdef getPartition(key: Any): Int = key match { // 计算出下游RDD的各个分区将具体处理哪些keycase null => 0case _ => Utils.nonNegativeMod(key.hashCode, numPartitions) // 对key的hashCode和numPartitions进行取模运算,得到key对应的分区索引。}override def equals(other: Any): Boolean = other match {case h: HashPartitioner =>h.numPartitions == numPartitionscase _ =>false}override def hashCode: Int = numPartitions
}
由于上游RDD所处理的key的hash值在取模后可能产生数据倾斜,所以HashPartitioner并不是一个均衡的分区计算器。
根据HashPartitioner的实现,我们知道ShuffleDependency中的分区依赖关系不再是一对一的,而是取决于key,并且当前RDD的某个分区将可能依赖于ShuffleDependency的RDD的任何一个分区。经过以上分析,ShuffleDependency采用HashPartitioner后的分区依赖可以用图7-5来表示。
五、RDDInfo
RDDInfo用于描述RDD的信息,RDDInfo提供的信息如下:
- id:RDD的id。
- name:RDD的名称。
- numPartitions:RDD的分区数量。
- storageLevel:RDD的存储级别(即StorageLevel)。
- parentIds:RDD的父亲RDD的id序列。这说明一个RDD会有零到多个父RDD。
- callSite:RDD的用户调用栈信息。
- scope:RDD的作用域范围。scope的类型为RDDOperationScope,每一个RDD都有一个RDDOperationScope。RDDOperationScope与Stage或Job之间并无特殊关系,一个RDDOperationScope可以存在于一个Stage内,也可以跨越多个Job。
- numCachedPartitions:缓存的分区数量。
- memSize:使用的内存大小。
- diskSize:使用的磁盘大小。
- externalBlockStoreSize:Block存储在外部的大小(指不同结点)。
RDD还提供了以下的方法:
1. isCached
是否已经缓存。
def isCached: Boolean = (memSize + diskSize > 0) && numCachedPartitions > 0
2. compare
由于RDDInfo继承了Ordered,所以重写了compare方法用于排序。compare的代码如下:
override def compare(that: RDDInfo): Int = {this.id - that.id // 根据id的大小进行排序}
3. fromRdd
用于从RDD构建出相应的RDDInfo,其实现代码如下:
private[spark] object RDDInfo {def fromRdd(rdd: RDD[_]): RDDInfo = {val rddName = Option(rdd.name).getOrElse(Utils.getFormattedClassName(rdd)) // 获取rddName属性val parentIds = rdd.dependencies.map(_.rdd.id) // 获取当前RDD依赖的所有父RDD的身份标识作为RDDInfo的parentIds属性new RDDInfo(rdd.id, rddName, rdd.partitions.length,rdd.getStorageLevel, parentIds, rdd.creationSite.shortForm, rdd.scope) // 创建RDDInfo对象并返回}
}