国外自适应网站模版,免费咨询法律,app开发设计方案怎么写,许昌网站建设公司Spark 的介绍与搭建#xff1a;从理论到实践_spark环境搭建-CSDN博客
Spark 的Standalone集群环境安装与测试-CSDN博客
PySpark 本地开发环境搭建与实践-CSDN博客
Spark 程序开发与提交#xff1a;本地与集群模式全解析-CSDN博客
Spark on YARN#xff1a;Spark集群模式…
Spark 的介绍与搭建从理论到实践_spark环境搭建-CSDN博客
Spark 的Standalone集群环境安装与测试-CSDN博客
PySpark 本地开发环境搭建与实践-CSDN博客
Spark 程序开发与提交本地与集群模式全解析-CSDN博客
Spark on YARNSpark集群模式之Yarn模式的原理、搭建与实践-CSDN博客
Spark 中 RDD 的诞生原理、操作与分区规则-CSDN博客
Spark 中的 RDD 分区的设定规则与高阶函数、Lambda 表达式详解-CSDN博客
RDD 算子全面解析从基础到进阶与面试要点-CSDN博客
PySpark 数据处理实战从基础操作到案例分析-CSDN博客
目录
一、Spark 容错机制概述
1、各个软件为了防止数据丢失的解决方案
2、Spark如何保障数据的安全
二、RDD 持久化机制
一cache 算子
二persist 算子
三unpersist 算子
四示例代码分析
三、RDD 检查点机制
一功能与原理
二适用场景
四、RDD 的 cache、persist 持久化机制和 checkpoint 检查点机制的区别
五、将日志分析案例进行优化
六、总结 在大数据处理领域Spark 作为一款强大的分布式计算框架面临着数据丢失和性能优化的双重挑战。为了确保数据的安全性和处理效率Spark 构建了一套完善的容错机制。本文将深入探讨 Spark 的容错机制包括 RDD 的持久化机制persist 和 cache 算子以及检查点机制checkpoint并分析它们的特点、适用场景以及相互之间的区别。 一、Spark 容错机制概述 1、各个软件为了防止数据丢失的解决方案 操作日志 将内存变化操作日志追加记录在一个文件中下一次读取文件对内存重新操作 - NAMENODE元数据的操作日志记录在edits - MySQL日志记录binlog () 副本机制 将数据构建多份冗余副本 - HDFS构建每个数据块的3个副本 依赖关系 每份数据保留与其他数据之间的一个转换关系 - RDD保留RDD与其他RDD之间的依赖关系 2、Spark如何保障数据的安全 每个RDD在构建数据时会根据自己来源一步步导到数据来源然后再一步步开始构建RDD数据。
问题如果一个RDD被触发多次这个RDD就会按照依赖关系被构建多次性能相对较差怎么解决
例如日志分析的时候三个问题tupleRdd 之前的所有操作都要执行三次每次读取100M多的数据效率非常的低
第一次一定会通过血脉构建这个RDD的数据希望从第二次开始就不要重复构建直接使用第一个构建的内容实现Spark持久化机制主动将RDD进行保存供多次使用避免重复构建 二、RDD 持久化机制 一cache 算子 功能将 RDD 缓存在内存中以便后续多次使用时无需重新计算。语法cache()。本质底层实际调用的是 persist(StorageLevel.MEMORY_ONLY)即只尝试将 RDD 缓存在内存。但如果内存资源不足缓存操作可能会失败。场景适用于资源充足且确定 RDD 只需在内存中缓存的情况例如对于一些频繁使用且数据量较小能够完全容纳在内存中的 RDD可以使用 cache 算子提高数据读取速度。 二persist 算子 功能能够将 RDD包含其依赖关系进行缓存并且可以根据需求自行指定缓存的级别这是它与 cache 算子的主要区别。语法persist(StorageLevel)。级别 将 RDD 缓存在磁盘中 StorageLevel.DISK_ONLY StorageLevel(True, False, False, False)将庞大且暂时不急需使用的 RDD 放入磁盘释放 Executor 内存。StorageLevel.DISK_ONLY_2 StorageLevel(True, False, False, False, 2)在磁盘中多存储一个缓存副本提高数据的冗余性和可用性。StorageLevel.DISK_ONLY_3 StorageLevel(True, False, False, False, 3)类似地存储三个副本。 StorageLevel.DISK_ONLY StorageLevel(True, False, False, False)
StorageLevel.DISK_ONLY_2 StorageLevel(True, False, False, False, 2)
StorageLevel.DISK_ONLY_3 StorageLevel(True, False, False, False, 3) 将 RDD 缓存在内存中 StorageLevel.MEMORY_ONLY StorageLevel(False, True, False, False)仅使用内存进行缓存常用于高频使用且数据量不大能适应内存容量的 RDD。StorageLevel.MEMORY_ONLY_2 StorageLevel(False, True, False, False, 2)增加一个内存缓存副本。 StorageLevel.MEMORY_ONLY StorageLevel(False, True, False, False)
StorageLevel.MEMORY_ONLY_2 StorageLevel(False, True, False, False, 2) 将 RDD 优先缓存在内存中如果内存不足就缓存在磁盘中 StorageLevel.MEMORY_AND_DISK StorageLevel(True, True, False, False)对于高频使用的大 RDD 较为合适先利用内存缓存当内存空间不足时自动将多余数据溢出到磁盘。StorageLevel.MEMORY_AND_DISK_2 StorageLevel(True, True, False, False, 2)多一个缓存副本。 StorageLevel.MEMORY_AND_DISK StorageLevel(True, True, False, False)
StorageLevel.MEMORY_AND_DISK_2 StorageLevel(True, True, False, False, 2)使用堆外内存StorageLevel.OFF_HEAP StorageLevel(True, True, True, False, 1)提供了除内存和磁盘之外的存储选择可利用堆外内存资源。 StorageLevel.OFF_HEAP StorageLevel(True, True, True, False, 1)使用序列化StorageLevel.MEMORY_AND_DISK_DESER StorageLevel(True, True, False, True)在内存和磁盘缓存时采用序列化方式有助于减少内存占用但在读取时需要进行反序列化操作会有一定性能开销。 StorageLevel.MEMORY_AND_DISK_DESER StorageLevel(True, True, False, True) 场景根据实际的资源状况灵活地将 RDD 缓存在不同的存储介质中或者设置多个缓存副本以平衡内存使用、数据读取速度和数据安全性。例如在内存和磁盘资源都较为充裕但内存使用较为紧张的情况下可以选择 MEMORY_AND_DISK 级别进行缓存既能保证数据的快速读取又能在内存不足时利用磁盘空间。 总结Spark的StorageLevel共有9个缓存级别 DISK_ONLY缓存入硬盘。这个级别主要是讲那些庞大的Rdd之后仍需使用但暂时不用的放进磁盘腾出Executor内存。 DISK_ONLY_2多一个缓存副本。 MEMORY_ONLY只使用内存进行缓存。这个级别最为常用对于马上用到的高频rdd推荐使用。 MEMORY_ONLY_2多一个缓存副本。 MEMORY_AND_DISK先使用内存多出来的溢出到磁盘对于高频的大rdd可以使用。 MEMORY_AND_DISK_2多一个缓存副本。 OFF_HEAP除了内存、磁盘还可以存储在OFF_HEAP 常用的 项目中经常使用 MEMORY_AND_DISK_2 MEMORY_AND_DISK_DESER 三unpersist 算子 功能释放已缓存的 RDD回收缓存占用的资源。语法unpersist还可以使用 unpersist(blockingTrue)表示等待 RDD 释放完资源后再继续执行下一步操作。场景当确定某个 RDD 不再被使用且后续还有大量代码需要执行时及时调用 unpersist 算子将其数据从缓存中释放避免资源的浪费。需要注意的是如果不手动释放缓存在 Spark 程序结束时系统也会自动清理该程序中的所有缓存内存。 四示例代码分析 以下是一个简单的 Spark 程序示例展示了如何使用 cache 、 persist 和 unpersist 算子 import os
import time# 导入pyspark模块
from pyspark import SparkContext, SparkConf
from pyspark.storagelevel import StorageLevelif __name__ __main__:# 配置环境os.environ[JAVA_HOME] C:/Program Files/Java/jdk1.8.0_241# 配置Hadoop的路径就是前面解压的那个路径os.environ[HADOOP_HOME] D:/hadoop-3.3.1# 配置base环境Python解析器的路径os.environ[PYSPARK_PYTHON] C:/ProgramData/Miniconda3/python.exe # 配置base环境Python解析器的路径os.environ[PYSPARK_DRIVER_PYTHON] C:/ProgramData/Miniconda3/python.exe# 获取 conf 对象# setMaster 按照什么模式运行local bigdata01:7077 yarn# local[2] 使用2核CPU * 你本地资源有多少核就用多少核# appName 任务的名字conf SparkConf().setMaster(local[*]).setAppName(spark的持久化机制)# 假如我想设置压缩# conf.set(spark.eventLog.compression.codec,snappy)# 根据配置文件得到一个SC对象第一个conf 是 形参的名字第二个conf 是实参的名字sc SparkContext(confconf)print(sc)fileRdd sc.textFile(../resources/1.dat)# cache 是转换算子#cacheRdd fileRdd.cache()cacheRdd fileRdd.persist(StorageLevel.MEMORY_AND_DISK_2)print(type(cacheRdd))cacheRdd.foreach(lambda x: print(x))time.sleep(20)cacheRdd.unpersist(blockingTrue)time.sleep(10)# 使用完后记得关闭sc.stop()# unpersist(blockingTrue)等RDD释放完再继续下一步
# blocking True阻塞 在上述代码中首先配置了 Spark 运行所需的环境变量然后创建了 SparkConf 和 SparkContext 对象。通过 textFile 方法读取文本文件创建了 fileRdd接着使用 persist 算子将其缓存到内存和磁盘并设置了两个副本。之后对缓存的 cacheRdd 进行了遍历操作模拟了对 RDD 的使用。在暂停 20 秒后调用 unpersist 算子释放缓存最后关闭 SparkContext。 三、RDD 检查点机制 一功能与原理 功能将 RDD 的数据不包含 RDD 依赖关系存储在可靠的存储系统如 HDFS中。可以将其类比为虚拟机中的快照作为数据处理过程中的一个重要里程碑。设置与使用 首先需要设置一个检查点目录例如sc.setCheckpointDir(../datas/chk/chk1)。然后对需要设置检查点的 RDD 调用 checkpoint() 方法如rs_rdd.checkpoint()。需要注意的是一定要在触发该 RDD 的算子之前调用 checkpoint() 方法否则检查点中可能没有数据。注意事项启用检查点机制后在代码执行过程中会专门多一个 job用于将 RDD 数据持久化存储到 HDFS 中。 二适用场景 适用于对 RDD 数据安全性要求极高但对性能要求相对不那么苛刻的场景。例如在一些数据处理任务中数据的准确性和完整性至关重要不容许因为任何故障导致数据丢失或错误即使这可能会带来一定的性能开销如在金融数据处理、关键业务数据分析等领域。 四、RDD 的 cache、persist 持久化机制和 checkpoint 检查点机制的区别
存储位置 persist可以将 RDD 缓存在内存或者磁盘中根据指定的缓存级别灵活选择存储介质。checkpoint将 RDD 的数据存储在文件系统磁盘通常是 HDFS中提供更可靠的持久化存储。生命周期 persist当代码中遇到了 unpersist 算子调用或者整个 Spark 程序结束时缓存会被自动清理释放资源。checkpoint检查点的数据不会被自动清理需要手动删除这使得数据在长时间内都能保持可用状态有利于数据的长期保存和回溯。存储内容 persist会保留 RDD 的血脉关系即与其他 RDD 的依赖关系。这样在缓存丢失时可以依据依赖关系重新构建 RDD恢复数据。checkpoint会斩断 RDD 的血脉关系仅存储 RDD 的数据本身。这意味着一旦检查点数据可用就不再依赖之前的 RDD 依赖链简化了数据恢复过程但也失去了基于依赖关系的灵活重建能力。 五、将日志分析案例进行优化 对前文的一个案例进行优化 import os
import re# 导入pyspark模块
from pyspark import SparkContext, SparkConf
import jieba
from pyspark.storagelevel import StorageLevelif __name__ __main__:# 配置环境os.environ[JAVA_HOME] D:/Program Files/Java/jdk1.8.0_271# 配置Hadoop的路径就是前面解压的那个路径os.environ[HADOOP_HOME] D:/hadoop-3.3.1/hadoop-3.3.1# 配置base环境Python解析器的路径os.environ[PYSPARK_PYTHON] C:/ProgramData/Miniconda3/python.exe # 配置base环境Python解析器的路径os.environ[PYSPARK_DRIVER_PYTHON] C:/ProgramData/Miniconda3/python.exe# 获取 conf 对象# setMaster 按照什么模式运行local bigdata01:7077 yarn# local[2] 使用2核CPU * 你本地资源有多少核就用多少核# appName 任务的名字conf SparkConf().setMaster(local[*]).setAppName(第一个Spark程序)# 假如我想设置压缩# conf.set(spark.eventLog.compression.codec,snappy)# 根据配置文件得到一个SC对象第一个conf 是 形参的名字第二个conf 是实参的名字sc SparkContext(confconf)fileRdd sc.textFile(../datas/sogou.tsv)print(fileRdd.count())print(fileRdd.first())listRdd fileRdd.map(lambda line: re.split(\\s, line))filterList listRdd.filter(lambda l1: len(l1) 6)# 这个结果只获取而来时间 uid 以及热词热词将左右两边的[] 去掉了tupleRdd filterList.map(lambda l1: (l1[0], l1[1], l1[2][1:-1]))# 将tupleRdd 缓存到内存中tupleRdd.cache()#tupleRdd.persist(storageLevelStorageLevel.MEMORY_AND_DISK)# 求热词wordRdd tupleRdd.flatMap(lambda t1: jieba.cut_for_search(t1[2]))filterRdd2 wordRdd.filter(lambda word: len(word.strip()) ! 0 and word ! 的).filter(lambda word: re.fullmatch([\u4e00-\u9fa5], word) is not None)# filterRdd2.foreach(print)result filterRdd2.map(lambda word: (word, 1)).reduceByKey(lambda sum, num: sum num).sortBy(keyfunclambda tup: tup[1], ascendingFalse).take(10)for ele in result:print(ele)# 第二问 (uid,功夫) 10)# [(time,uid,中华人民),()]def splitWord(tupl):li1 jieba.cut_for_search(tupl[2]) # 中国 中华 共和国li2 list()for word in li1:li2.append(((tupl[1], word),1))return li2newRdd tupleRdd.flatMap(splitWord)#newRdd.foreach(print)reduceByUIDAndWordRdd newRdd.reduceByKey(lambda sum,num : sum num)# reduceByUIDAndWordRdd.foreach(print)valList reduceByUIDAndWordRdd.values()print(valList.max())print(valList.min())print(valList.mean()) # 中位数print(valList.sum() / valList.count()) ## 第三问 统计一天每小时点击量并按照点击量降序排序reductByKeyRDD tupleRdd.map(lambda tup: (tup[0][0:2],1)).reduceByKey(lambda sum,num : sum num)sortRdd reductByKeyRDD.sortBy(keyfunclambda tup:tup[1],ascendingFalse)listNum sortRdd.take(24)for ele in listNum:print(ele)tupleRdd.unpersist(blockingTrue)# 使用完后记得关闭sc.stop()将任务运行运行过程中发现内存中存储了50M的缓存数据
适用场景RDD需要多次使用或者RDD是经过非常复杂的转换过程所构建。
一般缓存的RDD都是经过过滤经过转换之后重复利用的rdd,可以添加缓存否则不要加。 六、总结 Spark 的容错机制通过多种方式保障了数据处理的稳定性和高效性。RDD 的持久化机制包括 cache 和 persist 算子为频繁使用的 RDD 提供了灵活的缓存策略能够有效减少重复计算提高处理效率。而检查点机制则侧重于数据的高安全性存储在面对可能的缓存丢失或系统故障时确保数据的完整性和可用性。在实际应用中需要根据数据处理任务的特点、资源状况以及对数据安全性和性能的要求合理选择使用持久化机制和检查点机制以充分发挥 Spark 框架的优势构建可靠高效的大数据处理应用。