网站后台生成静态页面,长安东莞网站设计,如何选择专业网站开发商,我要学做网站RDD相关知识
RDD介绍
RDD 是Spark的核心抽象#xff0c;即 弹性分布式数据集#xff08;residenta distributed dataset#xff09;。代表一个不可变#xff0c;可分区#xff0c;里面元素可并行计算的集合。其具有数据流模型的特点#xff1a;自动容错#xff0c;位置…RDD相关知识
RDD介绍
RDD 是Spark的核心抽象即 弹性分布式数据集residenta distributed dataset。代表一个不可变可分区里面元素可并行计算的集合。其具有数据流模型的特点自动容错位置感知性调度和可伸缩性。 在Spark中对数据的所有操作不外乎创建RDD、转化已有RDD以及调用 RDD操作进行求值。
RDD结构图 RDD具有五大特性 一组分片Partition即数据集的基本组成单位RDD是由一系列的partition组成的。将数据加载为RDD时一般会遵循数据的本地性一般一个HDFS里的block会加载为一个partition。 RDD之间的依赖关系。依赖还具体分为宽依赖和窄依赖但并不是所有的RDD都有依赖。为了容错重算cachecheckpoint也就是说在内存中的RDD操作时出错或丢失会进行重算。 由一个函数计算每一个分片。Spark中的RDD的计算是以分片为单位的每个RDD都会实现compute函数以达到这个目的。compute函数会对迭代器进行复合不需要保存每次计算的结果。 可选如果RDD里面存的数据是key-value形式则可以传递一个自定义的Partitioner进行重新分区。 可选RDD提供一系列最佳的计算位置即数据的本地性。
RDD之间的依赖关系
RDD之间有一系列的依赖关系依赖关系又分为窄依赖和宽依赖。
窄依赖父RDD和子RDD partition之间的关系是一对一的。或者父RDD一个partition只对应一个子RDD的partition情况下的父RDD和子RDD partition关系是多对一的也可以理解为没有触发shuffle。
宽依赖父RDD与子RDD partition之间的关系是一对多。 父RDD的一个分区的数据去到子RDD的不同分区里面。也可以理解为触发了shuffle。
特别说明对于join操作有两种情况如果join操作的使用每个partition仅仅和已知的Partition进行join此时的join操作就是窄依赖其他情况的join操作就是宽依赖。 RDD创建 从Hadoop文件系统或与Hadoop兼容的其他持久化存储系统如Hive、Cassandra、HBase输入例如HDFS创建。 通过集合进行创建。 算子
算子可以分为Transformation 转换算子和Action 行动算子。 RDD是懒执行的如果没有行动操作出现所有的转换操作都不会执行。
RDD直观图如下 RDD 的 五大特性 一组分片Partition即数据集的基本组成单位。对于RDD来说每个分片都会被一个计算任务处理并决定并行计算的粒度。用户可以在创建RDD时指定RDD的分片个数如果没有指定那么就会采用默认值。默认值就是程序所分配到的CPU Core的数目。 一个计算每个分区的函数。Spark中RDD的计算是以分片为单位的每个 RDD都会实现 compute 函数以达到这个目的。compute函数会对迭代器进行复合不需要保存每次计算的结果。 RDD之间的依赖关系。RDD的每次转换都会生成一个新的RDD所以RDD之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时Spark 可以通过这个依赖关系重新计算丢失的分区数据而不是对RDD的所有分区进行重新计算。 一个Partitioner即RDD的分片函数。当前Spark中实现了两种类型的分片函数一个是基于哈希的HashPartitioner另外一个是基于范围的 RangePartitioner。只有对于于key-value的RDD才会有Partitioner非key-value的RDD的Parititioner的值是None。Partitioner函数不但决定了RDD本身的分片数量也决定了parent RDD Shuffle输出时的分片数量。 一个列表存储存取每个Partition的优先位置preferred location。对于一个HDFS文件来说这个列表保存的就是每个 Partition 所在的块的位置。按照“移动数据不如移动计算”的理念Spark 在进行任务调度的时候会尽可能地将计算任务分配到其所要处理数据块的存储位置。
相关API介绍
SparkContext创建
sc SparkContext(local, Simple App)
说明local 是指让Spark程序本地运行Simple App 是指Spark程序的名称这个名称可以任意为了直观明了的查看最好设置有意义的名称。
集合并行化创建RDD
data [1,2,3,4]rdd sc.parallelize(data)
collect算子在驱动程序中将数据集的所有元素作为数组返回注意数据集不能过大
rdd.collect()
停止SparkContext。
sc.stop()
# -*- coding: UTF-8 -*-
from pyspark import SparkContextif __name__ __main__:#********** Begin **********## 1.初始化 SparkContext该对象是 Spark 程序的入口sc SparkContext(local, Simple App)# 2.创建一个1到8的列表Listdata [1, 2, 3, 4, 5, 6, 7, 8]# 3.通过 SparkContext 并行化创建 rddrdd sc.parallelize(data)# 4.使用 rdd.collect() 收集 rdd 的内容。 rdd.collect() 是 Spark Action 算子在后续内容中将会详细说明主要作用是收集 rdd 的数据内容result rdd.collect()# 5.打印 rdd 的内容print(result)# 6.停止 SparkContextsc.stop()#********** End **********#
读取外部数据集创建RDD
编写读取本地文件创建Spark RDD的程序。
相关知识
为了完成本关任务你需要掌握1.如何读取本地文件系统中的文件来创建Spark RDD。
textFile 介绍
PySpark可以从Hadoop支持的任何存储源创建分布式数据集包括本地文件系统HDFSCassandraHBaseAmazon S3 等。Spark支持文本文件SequenceFiles和任何其他Hadoop InputFormat。
文本文件RDD可以使用创建SparkContex的textFile方法。此方法需要一个 URI的文件本地路径的机器上或一个hdfs://s3a:// 等 URI并读取其作为行的集合。这是一个示例调用
distFile sc.textFile(data.txt) # -*- coding: UTF-8 -*-
from pyspark import SparkContextif __name__ __main__:#********** Begin **********## 1.初始化 SparkContext该对象是 Spark 程序的入口sc SparkContext(local, Simple App)# 文本文件 RDD 可以使用创建 SparkContext 的t extFile 方法。
#此方法需要一个 URI的 文件本地路径的机器上或一个hdfs://s3a://等URI
#并读取其作为行的集合# 2.读取本地文件URI为/root/wordcount.txtrdd sc.textFile(/root/wordcount.txt)# 3.使用 rdd.collect() 收集 rdd 的内容。
#rdd.collect() 是 Spark Action 算子在后续内容中将会详细说明主要作用是收集 rdd 的数据内容result rdd.collect()# 4.打印 rdd 的内容print(result)# 5.停止 SparkContextsc.stop()#********** End **********#
map 算子
本关任务使用Spark的 map 算子按照相关需求完成转换操作。
相关知识
为了完成本关任务你需要掌握如何使用map算子。
map
将原来RDD的每个数据项通过map中的用户自定义函数 f 映射转变为一个新的元素。 图中每个方框表示一个RDD 分区左侧的分区经过自定义函数 f:T-U 映射为右侧的新 RDD 分区。但是实际只有等到 Action 算子触发后这个 f 函数才会和其他函数在一个 Stage 中对数据进行运算。
map 案例 sc SparkContext(local, Simple App)
data [1,2,3,4,5,6]
rdd sc.parallelize(data)
print(rdd.collect())
rdd_map rdd.map(lambda x: x * 2)
print(rdd_map.collect())
输出
[1, 2, 3, 4, 5, 6] [2, 4, 6, 8, 10, 12] 说明rdd1 的元素 1 , 2 , 3 , 4 , 5 , 6 经过 map 算子( x - x*2 )转换成了 rdd2 ( 2 , 4 , 6 , 8 , 10 )。
编程要求
请仔细阅读右侧代码根据方法内的提示在Begin - End区域内进行代码补充具体任务如下
需求使用 map 算子将rdd的数据 (1, 2, 3, 4, 5) 按照下面的规则进行转换操作规则如下:
偶数转换成该数的平方奇数转换成该数的立方。
# -*- coding: UTF-8 -*-
from pyspark import SparkContextif __name__ __main__:#********** Begin **********## 1.初始化 SparkContext该对象是 Spark 程序的入口sc SparkContext(local, Simple App)# 2.创建一个1到5的列表Listdata [1, 2, 3, 4, 5]# 3.通过 SparkContext 并行化创建 rddrdd sc.parallelize(data)# 4.使用rdd.collect() 收集 rdd 的元素。print(rdd.collect())使用 map 算子将 rdd 的数据 (1, 2, 3, 4, 5) 按照下面的规则进行转换操作规则如下:需求偶数转换成该数的平方奇数转换成该数的立方# 5.使用 map 算子完成以上需求rdd_map rdd.map(lambda x: x * x if x % 2 0 else x * x * x)# 6.使用rdd.collect() 收集完成 map 转换的元素print(rdd_map.collect())# 7.停止 SparkContextsc.stop()#********** End **********#
mapPartitions算子
mapPartitions
mapPartitions函数获取到每个分区的迭代器在函数中通过这个分区整体的迭 代器对整个分区的元素进行操作。 图中每个方框表示一个RDD分区左侧的分区经过自定义函数 f:T-U 映射为右侧的新RDD分区。
mapPartitions 与 map
map遍历算子可以遍历RDD中每一个元素遍历的单位是每条记录。
mapPartitions遍历算子可以改变RDD格式会提高RDD并行度遍历单位是Partition也就是在遍历之前它会将一个Partition的数据加载到内存中。
那么问题来了用上面的两个算子遍历一个RDD谁的效率高 mapPartitions算子效率高。
mapPartitions 案例 def f(iterator):
list []
for x in iterator:
list.append(x*2)
return listif __name__ __main__:
sc SparkContext(local, Simple App)
data [1,2,3,4,5,6]
rdd sc.parallelize(data)
print(rdd.collect())
partitions rdd.mapPartitions(f)
print(partitions.collect())
输出 [1, 2, 3, 4, 5, 6]
[2, 4, 6, 8, 10, 12] mapPartitions()传入的参数是rdd的 iterator元素迭代器返回也是一个iterator迭代器。
# -*- coding: UTF-8 -*-
from pyspark import SparkContext#********** Begin **********#
def f(iterator):list []for x in iterator:list.append((x, len(x)))return list#********** End **********#
if __name__ __main__:#********** Begin **********## 1.初始化 SparkContext该对象是 Spark 程序的入口sc SparkContext(local, Simple App)# 2. 一个内容为dog, salmon, salmon, rat, elephant的列表Listdata [dog, salmon, salmon, rat, elephant]# 3.通过 SparkContext 并行化创建 rddrdd sc.parallelize(data)# 4.使用rdd.collect() 收集 rdd 的元素。print(rdd.collect())使用 mapPartitions 算子将 rdd 的数据 (dog, salmon, salmon, rat, elephant) 按照下面的规则进行转换操作规则如下:需求将字符串与该字符串的长度组合成一个元组例如dog -- (dog,3)salmon -- (salmon,6)# 5.使用 mapPartitions 算子完成以上需求partitions rdd.mapPartitions(f)# 6.使用rdd.collect() 收集完成 mapPartitions 转换的元素print(partitions.collect())# 7.停止 SparkContextsc.stop()#********** End **********#
filter算子。
filter
filter 函数功能是对元素进行过滤对每个元素应用f函数返 回值为 true的元素在RDD中保留返回值为false的元素将被过滤掉。内部实现相当于生成。
FilteredRDD(thissc.clean(f))
下面代码为函数的本质实现 def filter(self, f):Return a new RDD containing only the elements that satisfy a predicate. rdd sc.parallelize([1, 2, 3, 4, 5])rdd.filter(lambda x: x % 2 0).collect()
[2, 4]def func(iterator):
return filter(fail_on_stopiteration(f), iterator)
return self.mapPartitions(func, True) 上图中每个方框代表一个 RDD 分区 T 可以是任意的类型。通过用户自定义的过滤函数 f对每个数据项操作将满足条件、返回结果为 true 的数据项保留。例如过滤掉 V2 和 V3 保留了 V1为区分命名为 V’1。
filter 案例 sc SparkContext(local, Simple App)
data [1,2,3,4,5,6]
rdd sc.parallelize(data)
print(rdd.collect())
rdd_filter rdd.filter(lambda x: x2)
print(rdd_filter.collect())
输出
[1, 2, 3, 4, 5, 6][3, 4, 5, 6] 说明rdd1( [ 1 , 2 , 3 , 4 , 5 , 6 ] ) 经过 filter 算子转换成 rdd2( [ 3 ,4 , 5 , 6 ] )。
使用 filter 算子将 rdd 中的数据 (1, 2, 3, 4, 5, 6, 7, 8) 按照以下规则进行过滤规则如下:
过滤掉rdd中的所有奇数。
# -*- coding: UTF-8 -*-
from pyspark import SparkContextif __name__ __main__:#********** Begin **********## 1.初始化 SparkContext该对象是 Spark 程序的入口sc SparkContext(local, Simple App)# 2.创建一个1到8的列表Listdata [1, 2, 3, 4, 5, 6, 7, 8]# 3.通过 SparkContext 并行化创建 rddrdd sc.parallelize(data)# 4.使用rdd.collect() 收集 rdd 的元素。print(rdd.collect())使用 filter 算子将 rdd 的数据 (1, 2, 3, 4, 5, 6, 7, 8) 按照下面的规则进行转换操作规则如下:需求过滤掉rdd中的奇数# 5.使用 filter 算子完成以上需求rdd_filter rdd.filter(lambda x: x % 2 0)# 6.使用rdd.collect() 收集完成 filter 转换的元素print(rdd_filter.collect())# 7.停止 SparkContextsc.stop()#********** End **********#
flatMap算子
flatMap
将原来RDD中的每个元素通过函数f转换为新的元素并将生成的RDD中每个集合的元素合并为一个集合内部创建
FlatMappedRDD(thissc.clean(f)) 上图表示RDD的一个分区进行flatMap函数操作flatMap中传入的函数为f:T-UT和U可以是任意的数据类型。将分区中的数据通过用户自定义函数f转换为新的数据。外部大方框可以认为是一个RDD分区小方框代表一个集合。V1、V2、V3在一个集合作为RDD的一个数据项可能存储为数组或其他容器转换为V’1、V’2、V’3后将原来的数组或容器结合拆散拆散的数据形成RDD中的数据项。
flatMap 案例
sc SparkContext(local, Simple App)
data [[m], [a, n]]
rdd sc.parallelize(data)
print(rdd.collect())
flat_map rdd.flatMap(lambda x: x)
print(flat_map.collect())
输出
[[m], [a, n]][m, a, n] flatMap将两个集合转换成一个集合
需求使用 flatMap 算子将rdd的数据 ([1, 2, 3], [4, 5, 6], [7, 8, 9]) 按照下面的规则进行转换操作规则如下:
合并RDD的元素例如 ([1,2,3],[4,5,6]) -- (1,2,3,4,5,6)([2,3],[4,5],[6]) -- (1,2,3,4,5,6) from pyspark import SparkContextif __name__ __main__:#********** Begin **********## 1.初始化 SparkContext该对象是 Spark 程序的入口sc SparkContext(local, Simple App)# 2.创建一个[[1, 2, 3], [4, 5, 6], [7, 8, 9]] 的列表Listlist [[1, 2, 3], [4, 5, 6], [7, 8, 9]]# 3.通过 SparkContext 并行化创建 rddrdd sc.parallelize(list)# 4.使用rdd.collect() 收集 rdd 的元素。print(rdd.collect()) 使用 flatMap 算子将 rdd 的数据 ([1, 2, 3], [4, 5, 6], [7, 8, 9]) 按照下面的规则进行转换操作规则如下:需求合并RDD的元素例如([1,2,3],[4,5,6]) -- (1,2,3,4,5,6)([2,3],[4,5],[6]) -- (1,2,3,4,5,6)# 5.使用 filter 算子完成以上需求flat_map rdd.flatMap(lambda x: x)# 6.使用rdd.collect() 收集完成 filter 转换的元素print(flat_map.collect())# 7.停止 SparkContextsc.stop()#********** End **********#distinct 算子 distinct distinct 将 RDD 中的元素进行去重操作。 上图中的每个方框代表一个 RDD 分区通过 distinct 函数将数据去重。 例如重复数据 V1、 V1 去重后只保留一份 V1 。 distinct 案例 sc SparkContext(local, Simple App)
data [python, python, python, java, java]
rdd sc.parallelize(data)
print(rdd.collect())
distinct rdd.distinct() 输出 [python, python, python, java, java]
[python, java] print(distinct.collect()) sortByKey 算子 sortByKey def sortByKey(self, ascendingTrue, numPartitionsNone, keyfunclambda x: x):
if numPartitions is None:
numPartitions self._defaultReducePartitions()memory self._memory_limit()
serializer self._jrdd_deserializerdef sortPartition(iterator):
sort ExternalSorter(memory * 0.9, serializer).sorted
return iter(sort(iterator, keylambda kv: keyfunc(kv[0]), reverse(not ascending)))if numPartitions 1:
if self.getNumPartitions() 1:
self self.coalesce(1)
return self.mapPartitions(sortPartition, True)# first compute the boundary of each part via sampling: we want to partition
# the key-space into bins such that the bins have roughly the same
# number of (key, value) pairs falling into them
rddSize self.count()
if not rddSize:
return self # empty RDD
maxSampleSize numPartitions * 20.0 # constant from Sparks RangePartitioner
f\fraction min(maxSampleSize / max(rddSize, 1), 1.0)
samples self.sample(False, f\fraction, 1).map(lambda kv: kv[0]).collect()
samples sorted(samples, keykeyfunc)# we have numPartitions many parts but one of the them has
# an implicit boundary
bounds [samples[int(len(samples) * (i 1) / numPartitions)]
for i in range(0, numPartitions - 1)]def rangePartitioner(k):
p bisect.bisect_left(bounds, keyfunc(k))
if ascending:
return p
else:
return numPartitions - 1 - preturn self.partitionBy(numPartitions, rangePartitioner).mapPartitions(sortPartition, True) 说明ascending参数是指排序升序还是降序默认是升序。numPartitions参数是重新分区默认与上一个RDD保持一致。keyfunc参数是排序规则。 sortByKey 案例 sc SparkContext(local, Simple App)data [(a,1),(a,2),(c,1),(b,1)]rdd sc.parallelize(data)key rdd.sortByKey()print(key.collect()) 输出 [(a, 1), (a, 2), (b, 1), (c, 1)] 需求使用 sortBy 算子将 rdd 中的数据进行排序升序。
from pyspark import SparkContextif __name__ __main__:# ********** Begin **********## 1.初始化 SparkContext该对象是 Spark 程序的入口sc SparkContext(local, Simple App)# 2.创建一个内容为[(B,1),(A,2),(C,3)]的列表ListList [(B,1),(A,2),(C,3)]# 3.通过 SparkContext 并行化创建 rddrdd sc.parallelize(List)# 4.使用rdd.collect() 收集 rdd 的元素print(rdd.collect())使用 sortByKey 算子将 rdd 的数据 (B, 1), (A, 2), (C, 3) 按照下面的规则进行转换操作规则如下:需求元素排序例如[(3,3),(2,2),(1,1)] -- [(1,1),(2,2),(3,3)]# 5.使用 sortByKey 算子完成以上需求key rdd.sortByKey()# 6.使用rdd.collect() 收集完成 sortByKey 转换的元素print(key.collect())# 7.停止 SparkContextsc.stop()# ********** End **********#
mapValues 算子
mapValues
mapValues 针对Key Value型数据中的 Value 进行 Map 操作而不对 Key 进行处理。 上图中的方框代表 RDD 分区。 aa2 代表对 (V1,1) 这样的 Key Value 数据对数据只对 Value 中的 1 进行加 2 操作返回结果为 3。
mapValues 案例 sc SparkContext(local, Simple App)
data [(a,1),(a,2),(b,1)]
rdd sc.parallelize(data)
values rdd.mapValues(lambda x: x 2)
print(values.collect())
输出 [(a, 3), (a, 4), (b, 3)] 需求使用mapValues算子将rdd的数据 (1, 1), (2, 2), (3, 3), (4, 4), (5, 5) 按照下面的规则进行转换操作规则如下:
偶数转换成该数的平方奇数转换成该数的立方 from pyspark import SparkContextif __name__ __main__:# ********** Begin **********## 1.初始化 SparkContext该对象是 Spark 程序的入口sc SparkContext(local, Simple App) # 2.创建一个内容为[(1, 1), (2, 2), (3, 3), (4, 4), (5, 5)]的列表ListList [(1, 1), (2, 2), (3, 3), (4, 4), (5, 5)]# 3.通过 SparkContext 并行化创建 rddrdd sc.parallelize(List)# 4.使用rdd.collect() 收集 rdd 的元素print(rdd.collect())使用 mapValues 算子将 rdd 的数据 (1, 1), (2, 2), (3, 3), (4, 4), (5, 5) 按照下面的规则进行转换操作规则如下:需求元素key,value的value进行以下操作偶数转换成该数的平方奇数转换成该数的立方# 5.使用 mapValues 算子完成以上需求values rdd.mapValues(lambda x: x 2)# 6.使用rdd.collect() 收集完成 mapValues 转换的元素print(values.collect())# 7.停止 SparkContextsc.stop()# ********** End **********#reduceByKey 算子 reduceByKey reduceByKey 算子只是两个值合并成一个值比如叠加。 函数实现 def reduceByKey(self, func, numPartitionsNone, partitionFuncportable_hash):
return self.combineByKey(lambda x: x, func, func, numPartitions, partitionFunc)
上图中的方框代表 RDD 分区。通过自定义函数 (A,B) (A B) 将相同 key 的数据 (V1,2) 和 (V1,1) 的 value 做加法运算结果为 V1,3。
reduceByKey 案例
sc SparkContext(local, Simple App)
data [(a,1),(a,2),(b,1)]
rdd sc.parallelize(data)
print(rdd.reduceByKey(lambda x,y:xy).collect())
输出
[(a, 3), (b, 1)] 需求使用 reduceByKey 算子将 rdd(key-value类型) 中的数据进行值累加。
例如
(soma,4), (soma,1) (soma,2 - (soma,7)
from pyspark import SparkContextif __name__ __main__:# ********** Begin **********## 1.初始化 SparkContext该对象是 Spark 程序的入口sc SparkContext(local, Simple App) # 2.创建一个内容为[(python, 1), (scala, 2), (python, 3), (python, 4), (java, 5)]的列表ListList [(python, 1), (scala, 2), (python, 3), (python, 4), (java, 5)]# 3.通过 SparkContext 并行化创建 rddrdd sc.parallelize(List) # 4.使用rdd.collect() 收集 rdd 的元素print(rdd.collect())使用 reduceByKey 算子将 rdd 的数据[(python, 1), (scala, 2), (python, 3), (python, 4), (java, 5)] 按照下面的规则进行转换操作规则如下:需求元素key-value的value累加操作例如(1,1),(1,1),(1,2) -- (1,4)(1,1),(1,1),(2,2),(2,2) -- (1,2),(2,4)# 5.使用 reduceByKey 算子完成以上需求reduce rdd.reduceByKey(lambda x,y:xy)# 6.使用rdd.collect() 收集完成 reduceByKey 转换的元素print(reduce.collect())# 7.停止 SparkContextsc.stop()# ********** End **********#
Action 的常用算子
count
count()返回 RDD 的元素个数。
示例 sc SparkContext(local, Simple App)
data [python, python, python, java, java]
rdd sc.parallelize(data)
print(rdd.count())
输出
5
first
first()返回 RDD 的第一个元素类似于take(1)。
示例 sc SparkContext(local, Simple App)
data [python, python, python, java, java]
rdd sc.parallelize(data)
print(rdd.first())
输出
python
take
take(n)返回一个由数据集的前 n 个元素组成的数组。
示例 sc SparkContext(local, Simple App)
data [python, python, python, java, java]
rdd sc.parallelize(data)
print(rdd.take(2))
输出
[python, python]
reduce
reduce()通过func函数聚集 RDD 中的所有元素该函数应该是可交换的和关联的以便可以并行正确计算。
示例 sc SparkContext(local, Simple App)
data [1,1,1,1]
rdd sc.parallelize(data)
print(rdd.reduce(lambda x,y:xy))
输出
4
collect
collect()在驱动程序中以数组的形式返回数据集的所有元素。
示例 sc SparkContext(local, Simple App)
data [1,1,1,1]
rdd sc.parallelize(data)
print(rdd.collect())
输出
[1,1,1,1]
具体任务如下
需求1使用 count 算子统计下 rdd 中元素的个数
需求2使用 first 算子获取 rdd 首个元素
需求3使用 take 算子获取 rdd 前三个元素
需求4使用 reduce 算子进行累加操作
需求5使用 collect 算子收集所有元素。
from pyspark import SparkContext
if __name__ __main__:# ********** Begin **********## 1.初始化 SparkContext该对象是 Spark 程序的入口sc SparkContext(local, Simple App)# 2.创建一个内容为[1, 3, 5, 7, 9, 8, 6, 4, 2]的列表ListList [1, 3, 5, 7, 9, 8, 6, 4, 2] # 3.通过 SparkContext 并行化创建 rddrdd sc.parallelize(List)# 4.收集rdd的所有元素并print输出print(rdd.collect())# 5.统计rdd的元素个数并print输出print(rdd.count())# 6.获取rdd的第一个元素并print输出print(rdd.first())# 7.获取rdd的前3个元素并print输出print(rdd.take(3))# 8.聚合rdd的所有元素并print输出print(rdd.reduce(lambda x,y:xy))# 9.停止 SparkContextsc.stop()# ********** End **********#