PySpark

一、概念

1.Spack是什么?
Apache Spark是用于大规模数据处理的统一分析引擎,是一款分布式的计算框架,用于调度成百上千的服务器集群,计算TB、PB乃至EB级别的海量数据。

2.PySpark是什么?
pyspark是用spark官方开发的python第三方库,可以使用pip程序快速安装,并像其他第三方库那样使用。PySpark可以作为Python库进行数据处理,提交至Spark集群进行分布式集群计算。

二、准备工作

1.安装PySpark
按win+r键,输入cmd打开命令提示符程序,输入
pip install pystark
或使用国内代理镜像站(清华大学源)
pip install -i https://pypi.tuna/tsinghua.edu.cn/simple pyspark
也可以在Pycharm里直接安装

2.除此之外,还需要安装java,地址:https://www.oracle.com/java/technologies/downloads/
配置环境:

变量名:JAVA_HOME
变量值:C:\Program Files (x86)\Java\jdk-20     // 要根据自己的实际路径配置

变量名:CLASSPATH
变量值:.;%JAVA_HOME%\lib\dt.jar;%JAVA_HOME%\lib\tools.jar;     

变量名:Path
变量值:%JAVA_HOME%\bin;%JAVA_HOME%\jre\bin;

重启Pycharm,选择Run—>Edit Configurations,添加JAVAHOME

3.测试是否安装成功

想要使用Pyspark库完成数据处理,首先需要构建一个执行环境入口对象
PySpark的执行环境入口对象是:类SparkContext的类对象

# 导包
from pyspark import SparkConf, SparkContext

# 创建SparkConf类对象

#conf=SparkConf()
#conf.setMaster("local[*]")  # setMaster()指定spark的运行模式,local指以单机模式运行在本机上
#conf.setAppName("test_spark_app") #指定名称

#链式调用的原则是调用的方法返回值都是同一个对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")  

# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)  #sc就是执行环境入口对象

# 打印PySpark的运行版本
print(sc.version)

# 停止SparkContext对象的运行(停止运行PyStark程序)
sc.stop()

运行代码,成功显示PySpark的运行版本,证明安装成功

三、PySpark的编程模型

SparkContext类对象,是PySpark编程中一切功能的入口

PySpark的编程,主要分为如下三大步骤:
1.数据输入
通过SparkContext类对象的成员方法完成数据的读取操作,读取后得到RDD类对象
2.数据处理计算
通过RDD类对象的成员方法,完成各种数据计算的需求
3.数据输出
将处理完成后的RDD对像调用各种成员方法完成写出文件,转换为list,tuple,dict等操作

RDD(Resilient Distributed Datasets),全称:弹性分布式数据集
RDD对象:PySpark支持多种数据的输入,在输入完成之后,都会得到一个RDD的对象。
PySpark针对数据的处理,都是以RDD对象作为载体,即:

  • 数据存储在RDD中内
  • 各类数据的计算方式也都是RDD的成员方法
  • RDD的数据计算方法,返回值依旧是RDD对象

四、数据输入

from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)

# 通过parallelize方法将Python对象加载到Spark内,成为RDD对象
rdd1 = sc.parallelize([1, 2, 3, 4, 5])
rdd2 = sc.parallelize((1, 2, 3, 4, 5))
rdd3 = sc.parallelize("PySpark")
rdd4 = sc.parallelize({1, 2, 3, 4, 5})
rdd5 = sc.parallelize({"key1": "value1", "key2": "value2"})

#读取文件转RDD对象
rdd6 = sc.textFile("C:/test.txt")

# 如果要查看RDD中有什么内容,需要用collect()方法
print(rdd1.collect())  # [1, 2, 3, 4, 5]
print(rdd2.collect())  # [1, 2, 3, 4, 5]
print(rdd3.collect())  # ['P', 'y', 'S', 'p', 'a', 'r', 'k']
print(rdd4.collect())  # [1, 2, 3, 4, 5]
print(rdd5.collect())  # ['key1', 'key2']
print(rdd6.collect())  # ['测试1', '', '测试2', '', '测试3']
sc.stop()

五、数据处理

RDD内置丰富的成员方法(算子)

map算子
功能:map算子是将RDD的数据一条条处理(处理的逻辑基于map算子中接受的处理函数),返回新的RDD

from pyspark import SparkConf, SparkContext
#Spark不能自动找到python解释器,需要指定
import os
os.environ['PYSPARK_PYTHON'] = 'C:/Users/tangling/venv/Scripts/python.exe'

conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)
rdd = sc.parallelize([1, 2, 3, 4, 5])

# 通过map方法将全部数据都乘10
def func(data):
    return data * 10

rdd2 = rdd.map(func)
print(rdd2.collect())    #[10, 20, 30, 40, 50]
# 链式调用
# 给rdd中的每一个元素乘10后再加5
rdd3=rdd.map(lambda x:x*10).map(lambda x:x+5)
print(rdd3.collect())	 #[15, 25, 35, 45, 55]
sc.stop()

flatmap算子
功能:对RDD执行map操作,然后进行解除嵌套操作(与map相比之多了一层解嵌套)
解除嵌套:
list=[[1,2,3],[4,5,6],[7,8,9]] ——> list=[1,2,3,4,5,6,7,8,9]

from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']='C:/Users/tangling/venv/Scripts/python.exe'

conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
sc=SparkContext(conf=conf)
rdd=sc.parallelize(["python project","hello world"])

#需求:将RDD数据中的一个个单词提取出来
rdd2=rdd.flatMap(lambda x:x.split(" "))
print(rdd2.collect())  #['python', 'project', 'hello', 'world']
sc.close()

reduceByKey算子
功能:针对KV型RDD,自动按照Key分组,然后根据你提供的聚合逻辑,完成组内数据(value)的聚合操作

func:(V,V)—>V
接受2个传入参数(类型要一致),返回一个返回值,类型和传入要求一致

from pyspark import SparkConf, SparkContext
import os

os.environ['PYSPARK_PYTHON'] = 'C:/Users/tangling/venv/Scripts/python.exe'

conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
rdd = sc.parallelize([('男', 99), ('男', 88), ('男', 79), ('女', 97), ('女', 89)])

# 需求:求男生和女生两个组的成绩之和
result = rdd.reduceByKey(lambda a, b: a + b)
print(result.collect())	   #[('男', 266), ('女', 186)]

案例:统计指定文件的词频

from pyspark import SparkConf, SparkContext
import os

os.environ['PYSPARK_PYTHON'] = 'C:/Users/tangling/venv/Scripts/python.exe'
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)

rdd = sc.textFile('C:/test.txt')
#取出全部单词
words=rdd.flatMap(lambda x:x.split(" "))
#将所有单词都转化成二元元组,单词为key,value设置为1
word_with_one_rdd=words.map(lambda word:(word,1)) 
#分组求和
result=word_with_one_rdd.reduceByKey(lambda a,b:a+b)

print(result.collect())  #[('world', 1), ('i', 3), ('love', 2), ('yuanyuan', 1), ('pangpang', 1), ('am', 1), ('tangling', 1), ('hello', 1)]

Filter算子
功能:过滤想要的数据进行保留
func:(T)—>bool 传入一个参数进来类型随意,返回值必须是True或False,返回值是True的被留下来,False的数据被丢弃

from pyspark import SparkConf, SparkContext
import os

os.environ['PYSPARK_PYTHON'] = 'C:/Users/tangling/venv/Scripts/python.exe'
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)

rdd = sc.parallelize([1, 2, 3, 4, 5])
#只保留rdd中的偶数
result = rdd.filter(lambda num: num % 2 == 0)
print(result.collect())    #[2,4]

distinct算子
功能:对RDD数据进行去重,返回新的RDD,无需传参

rdd = sc.parallelize([1, 1,2,3,3,4])
result = rdd.distinct()
print(result.collect())   #[1,2,3,4]

sortBy算子
功能:对RDD数据进行排序,基于你指定的排序依据
语法:rdd.sortBy(func,ascending=False,numPartitions=1)
func:(T)—>U:告知按照RDD中的哪一个数据进行排序,比如lambda x:x[1]表示按照rdd中的第二列元素进行排序
ascending=True(升序)False(降序)
numPartitons:用多少分区排序,全局排序需要设置分区数为1

对之前案例中的结果进行排序

from pyspark import SparkConf, SparkContext
import os

os.environ['PYSPARK_PYTHON'] = 'C:/Users/tangling/venv/Scripts/python.exe'
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)

rdd = sc.textFile('C:/test.txt')
#取出全部单词
words=rdd.flatMap(lambda x:x.split(" "))
#将所有单词都转化成二元元组,单词为key,value设置为1
word_with_one_rdd=words.map(lambda word:(word,1)) 
#分组求和
result_rdd=word_with_one_rdd.reduceByKey(lambda a,b:a+b)
#对结果进行排序
final_rdd=result_rdd.sortBy(lambda x:x[1],ascending=False,numPartitions=1)

print(final_rdd.collect()) #[('i', 3), ('love', 2), ('world', 1), ('yuanyuan', 1), ('pangpang', 1), ('am', 1), ('tangling', 1), ('hello', 1)]

综合案例:结合所学知识,完成以下需求:
需求1:城市销售额排名
需求2:全部城市有哪些商品类别在售卖
需求3:北京有哪些商品类型在售卖

{“id”:1,“timestamp”:“2019-05-08T01:03.00Z”,“category”:“平板电脑”,“areaName”:“北京”,“money”:“1450”}|{“id”:2,“timestamp”:“2019-05-08T01:01.00Z”,“category”:“手机”,“areaName”:“北京”,“money”:“1450”}|{“id”:3,“timestamp”:“2019-05-08T01:03.00Z”,“category”:“手机”,“areaName”:“北京”,“money”:“8412”}
{“id”:4,“timestamp”:“2019-05-08T05:01.00Z”,“category”:“电脑”,“areaName”:“上海”,“money”:“1513”}|{“id”:5,“timestamp”:“2019-05-08T01:03.00Z”,“category”:“家电”,“areaName”:“北京”,“money”:“1550”}|{“id”:6,“timestamp”:“2019-05-08T01:01.00Z”,“category”:“电脑”,“areaName”:“杭州”,“money”:“1550”}
{“id”:7,“timestamp”:“2019-05-08T01:03.00Z”,“category”:“电脑”,“areaName”:“北京”,“money”:“5611”}|{“id”:8,“timestamp”:“2019-05-08T03:01.00Z”,“category”:“家电”,“areaName”:“北京”,“money”:“4410”}|{“id”:9,“timestamp”:“2019-05-08T01:03.00Z”,“category”:“家具”,“areaName”:“郑州”,“money”:“1120”}
{“id”:10,“timestamp”:“2019-05-08T01:01.00Z”,“category”:“家具”,“areaName”:“北京”,“money”:“6661”}|{“id”:11,“timestamp”:“2019-05-08T05:03.00Z”,“category”:“家具”,“areaName”:“杭州”,“money”:“1230”}|{“id”:12,“timestamp”:“2019-05-08T01:01.00Z”,“category”:“书籍”,“areaName”:“北京”,“money”:“5550”}
{“id”:13,“timestamp”:“2019-05-08T01:03.00Z”,“category”:“书籍”,“areaName”:“北京”,“money”:“5550”}|{“id”:14,“timestamp”:“2019-05-08T01:01.00Z”,“category”:“电脑”,“areaName”:“北京”,“money”:“1261”}|{“id”:15,“timestamp”:“2019-05-08T03:03.00Z”,“category”:“电脑”,“areaName”:“杭州”,“money”:“6660”}
{“id”:16,“timestamp”:“2019-05-08T01:01.00Z”,“category”:“电脑”,“areaName”:“天津”,“money”:“6660”}|{“id”:17,“timestamp”:“2019-05-08T01:03.00Z”,“category”:“书籍”,“areaName”:“北京”,“money”:“9000”}|{“id”:18,“timestamp”:“2019-05-08T05:01.00Z”,“category”:“书籍”,“areaName”:“北京”,“money”:“1230”}
{“id”:19,“timestamp”:“2019-05-08T01:03.00Z”,“category”:“电脑”,“areaName”:“杭州”,“money”:“5551”}|{“id”:20,“timestamp”:“2019-05-08T01:01.00Z”,“category”:“电脑”,“areaName”:“北京”,“money”:“2450”}
{“id”:21,“timestamp”:“2019-05-08T01:03.00Z”,“category”:“食品”,“areaName”:“北京”,“money”:“5520”}|{“id”:22,“timestamp”:“2019-05-08T01:01.00Z”,“category”:“食品”,“areaName”:“北京”,“money”:“6650”}
{“id”:23,“timestamp”:“2019-05-08T01:03.00Z”,“category”:“服饰”,“areaName”:“杭州”,“money”:“1240”}|{“id”:24,“timestamp”:“2019-05-08T01:01.00Z”,“category”:“食品”,“areaName”:“天津”,“money”:“5600”}
{“id”:25,“timestamp”:“2019-05-08T01:03.00Z”,“category”:“食品”,“areaName”:“北京”,“money”:“7801”}|{“id”:26,“timestamp”:“2019-05-08T01:01.00Z”,“category”:“服饰”,“areaName”:“北京”,“money”:“9000”}
{“id”:27,“timestamp”:“2019-05-08T01:03.00Z”,“category”:“服饰”,“areaName”:“杭州”,“money”:“5600”}|{“id”:28,“timestamp”:“2019-05-08T01:01.00Z”,“category”:“食品”,“areaName”:“北京”,“money”:“8000”}|{“id”:29,“timestamp”:“2019-05-08T02:03.00Z”,“category”:“服饰”,“areaName”:“杭州”,“money”:“7000”}

from pyspark import SparkConf, SparkContext
import json
import os

os.environ['PYSPARK_PYTHON'] = 'C:/Users/tangling/venv/Scripts/python.exe'
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
# TODO 需求1:城市销售额排名
file_rdd = sc.textFile('C:/orders.txt')
# 取出一个个json字符串
json_str_rdd = file_rdd.flatMap(lambda x: x.split("|"))
# 将json字符串都转化成字典
dict_rdd = json_str_rdd.map(lambda x: json.loads(x))
#取出城市和销售额数据
city=dict_rdd.map(lambda x:(x['areaName'],int(x['money'])))
#按城市分组按销售额聚合
city_result_rdd=city.reduceByKey(lambda a,b:a+b)
#按销售额聚合结果进行排序
city_result_rdd.sortBy(lambda x:x[1],ascending=False,numPartitions=1)
print(f"需求1的结果{city_result_rdd.collect()}")

#TODO  需求2:全部城市有哪些商品类别在售卖
category_rdd=dict_rdd.map(lambda x:x['category']).distinct()
print(f"需求2的结果{category_rdd.collect()}")

#TODO   需求3:北京有哪些商品类型在售卖
#过滤出北京的数据
beijing_data_rdd=dict_rdd.filter(lambda x:x['areaName']=='北京')
#取出全部商品类别
result3=beijing_data_rdd.map(lambda x:x['category']).distinct()
print(f"需求3的结果{result3.collect()}")

六、数据输出

数据输出的方法:

collect算子
功能:将RDD各个分区内的数据,统一收集到Driver中,形成一个List对象 即 RDD—>LIST

rdd = sc.parallelize([1, 2, 3, 4, 5])
# collect算子,输出RDD为list对象
rdd_list: list = rdd.collect()
print(rdd_list)  # [1, 2, 3, 4, 5]
print(type(rdd_list))  #<class 'list'>

reduce算子
功能:对RDD数据集按照你传入的逻辑进行聚合
语法:rdd.reduce(func)
func:(T,T)—>T 2个参数,一个返回值,返回值和参数要求类型一致
逻辑图:

rdd = sc.parallelize(range(1,10))
# 将rdd的数据进行累加求和
print(rdd.reduce(lambda a,b:a+b))    #45

take算子
功能:取出RDD的前n个元素,组合成list返回

rdd=sc.parallelize([3,2,1,3,4,5]).take(5)
print(rdd)    #[3, 2, 1, 3, 4]

count算子
功能:计算RDD有多少条数据,返回值是一个数字

rdd_count=sc.parallelize([3,2,1,3,4,5]).count()
print(rdd_count)		#6

saveAsTextFile算子 功能:将RDD的数据写入文本文件中,支持本地写出,hdfs等文件系统

rdd1=sc.parallelize([3,2,1,3,4,5])
rdd1.saveAsTextFile("D:/output1")

修改rdd分区为1个
方法一:SparkConf对象设置属性全局并行度为1

conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
conf.set("spark.default.parallelism","1")
sc = SparkContext(conf=conf)

方法二:在创建RDD的时候,设置parallelize方法传入numSlices参数为1

rdd1=sc.parallelize([3,2,1,3,4,5],numSlices=1)
#rdd1=sc.parallelize([1,2,3,4,5],1)

文章出处登录后可见!

已经登录?立即刷新

共计人评分,平均

到目前为止还没有投票!成为第一位评论此文章。

(0)
青葱年少的头像青葱年少普通用户
上一篇 2023年11月6日
下一篇 2023年11月6日

相关推荐