Transformation:
-------------------2 types
1. Narrow
2. Wide
RDD/DF--> internally it is a collection of paritions
rd1=spark.sparkContext.textFile("file:///home/hadoop/calllogdata")
rd1.persist()
rd2=rd1.map(\\
m1 m2 m3
spark spark hadoop,hive
spark ,spark,hive
spark->3-->m2 to m1
hadoop-->2-->m2 to m3
wide transformation is the one which involves
shuffling(redistribution of the data)
x1
m1 m2 m3
spark hadoop spark
hadoop hive Hadoop
hive pig
x1.filter(;amda x:'spark' in x)
m1 m2 m3
spark spark
x.map(lambda x:x+"hi")
m1 m2 m3
sparkhi
hadoophi sparkhihadoophi hivehi hadoophi
hivehi pighi
Narrow are the ones which do not involve in shuffling
HDFS
n1 n2 n3 n4
b1 b2 b3 b4---->filex
| |
| |
p1 p2 p3 p4 ---->rdd(memory)
| | | |
p1.t1 p2.t1 p3.t1
p4.t1 -->r1.t1
| | | |
px1 px2 px3 px4----->r2
| | |
|
px1.t2 px2.t2 px3.t2
px4.t2--->r2.t2
| | | |
py1 py2 py3 py4
r1=spark.sparkContext.textFile("hdfs://...filex")
r2=r1.t1()
r3=r2.t2()
Dataframe:
import pyspark.sql.functions import count
df1=spark.read.format('csv').option('header',True).option('inferSchema',True).load("
")df1.show()
df2=df1.where("Age>=50")
df2.show()
df3=df2.select('ID','Age','State','Sex')
df4=df3.groupBy('Sex').agg(count(*))
df4.show()
spark-->RDD
spark DF is built on top of RDD-->Schema RDD
DF-->files/tables/Driectory
RDD-->files/Directory
y5=spark.sparkContext.wholeTextFiles(" ")
print(y5.count())print(y5.map(lambda x:x[0]).collect()
No comments:
Post a Comment