Aggregation operations:
Aggregation:RDD->partitions
aggr(RDD)-->aggr(partitions)
from pyspark.sql import SparkSession
spark=SparkSession.builder.master('local').appName('aggregation').getOrCreate()
l1=[1,2,3,4,5,6,7,8]
r1=spark.sparkContext.parallelize(l1,3) #creation of RDDprint(r1.getNumPartitions())
3
how to know what are the elements in each partitions:
def f1(x):yield list(x) #function
r2=r1.mapPartitions(lambda x:f1(x)) #for viewing the data of the partitions
r2.collect()
[[1, 2], [3, 4], [5, 6, 7, 8]]
def f2(x):yield sum(x) #function
r3=r1.mappartitions(lambda x:f2(x)) #for viewing the data of the partitions
r3.collect()
[3, 7, 26]
a1=r1.reduce(lambda x,y:x+y)
type(a1)print(a1)
36
a2=r1.reduce(lambda x,y:x-y)
print(a2)
16
RDD--> p1
p2 p3
1,2 3,4
5,6,7,81+2 3+4 5+6+7+8
x+y 3 7 26-->3+7+26=36
spark when performing one aggr,1.it performs the operation
on each partitions individually
2.the same
operation will be applied on the result of the partitions
spark does not maintain the order a+b=b+a==>spark will not do that(a-b !=b-a
irrespectively of order of the result of the partitions, the
outcome has to be always same.
m1 m2 m3 m4
p1 p2 p3 p4p1+p2+p3+p4->x
p2+p3+p4+p1->x
additons-->a+b+c+d=c+b+a+d==b+d+c+a--->
use only the opeations as aggregations which can generate consistant result always reduce
reduce(lambda x,y:x+y)-->yes
reduce(lambda x,y:x-y)---->no (it wont be consistant)
if we consolidate results of all partitions-->must be same
Given the maximum element in the given RDD
def f3(x,y):
if x>y:
return x
else:
return y
a3=r1.reduce(lambda x,y:f3(x,y))
print(a3)
8
Given the minimum element in the given RDDdef f3(x,y):
if x>y
return y
else
return x
a3=r1.reduce(lambda x,y:f3(x,y))
print(a3)
1
1,2--->1<2==>1
3,4--->3<4===>35,6,7,8->5<6--->7<8--->5<8-->5
1,3,5
1<3-->1<5-->1reduce-->Action-->no shuffling
b1=r1.fold(2,lambda x,y:x+y)
print(b1)
44
b2=r1.fold(0,lambda x,y:x+y)print(b2)
36
fold(num,fun)
fun-->lambda x,y:x+yes1,2-->num+1-->2+1==>3+2==>5
3,4-->2+3-->5+4==>9
5,6,7,8-->2+5-->7+6-->13+7-->20+8-->28
2(initial value)+ 5,9,28==>44
rd1=spark.sparkContext.parallelize(["hyde","chn","delhi","pune","mumbai","bnglr","guwahati"],2)rd2=rd1.mappartitions(lambda x:f1(x))
print(rd2.collect())
[['hyde', 'chn', 'delhi'], ['pune', 'mumbai', 'bnglr',
'guwahati']]
rd2=spark.sparkContext.parallelize([2000,12000,30000,25000,42000,10000],2)
print(rd2.collect())
rd32=rd2.fold(5000,lambda x,y:x+y)
print(rd32)
136000
[12000,30000]fold(5000,f3(x,y)
5000
5000<12000-->5000
5000<30000-->5000
[25000,4000,10000]
5000
5000<25000-->5000
5000<4000-->4000
4000<10000-->4000
5000
print(r1.collect())
c1=r1.aggregate(2,(lambda x,y:x+y),(lambda x,y:x-y))
aggregate(zero values,f1,f2)
f1 will be applied on partitions data with 0 valuesf2 will be applied on the results of the partitions
1,2-->2+1==>3+2==>5
3,4-->2+3-->5+4==>9
5,6,7,8->2+5-->7+6==>13+7==>20+8==>28
5,9,28==>2-5==>-3-9==>-12-28==>-40
reduce(f1)--f1 will be applied on partitions and result of partitions
fold(zv,f1)-->f1 will be applied on partitions and result of partitions with zero values
aggregate(zv,f1,f2)-->f1 will be applied on partitions with zv and f2 will be applied on the results of partitions with zv
No comments:
Post a Comment