Friday, March 8, 2019

Pyspark Aggregation Operations


Aggregation operations:
Aggregation:
RDD->partitions
aggr(RDD)-->aggr(partitions)
from pyspark.sql import SparkSession
spark=SparkSession.builder.master('local').appName('aggregation').getOrCreate()

l1=[1,2,3,4,5,6,7,8]
r1=spark.sparkContext.parallelize(l1,3) #creation of RDD
print(r1.getNumPartitions())
3
how to know what are the elements in each partitions:
def f1(x):yield list(x) #function
r2=r1.mapPartitions(lambda x:f1(x)) #for viewing the data of the partitions
r2.collect()
[[1, 2], [3, 4], [5, 6, 7, 8]]

def f2(x):yield sum(x) #function
r3=r1.mappartitions(lambda x:f2(x)) #for viewing the data of the partitions
r3.collect()
[3, 7, 26]

a1=r1.reduce(lambda x,y:x+y)
type(a1)
print(a1)
36
a2=r1.reduce(lambda x,y:x-y)
print(a2)
16

RDD--> p1      p2      p3
       1,2    3,4    5,6,7,8
                  1+2    3+4    5+6+7+8
x+y     3      7     26-->3+7+26=36

spark when performing one aggr,1.it performs the operation on each partitions individually
                               2.the same operation will be applied on the result of the partitions

spark does not maintain the order  a+b=b+a==>spark will not do that(a-b !=b-a
irrespectively of order of the result of the partitions, the outcome has to be always same.

m1 m2 m3 m4
p1 p2 p3 p4
p1+p2+p3+p4->x
p2+p3+p4+p1->x
additons-->a+b+c+d=c+b+a+d==b+d+c+a--->
use only the opeations as aggregations which can generate consistant result always reduce
reduce(lambda x,y:x+y)-->yes
reduce(lambda x,y:x-y)---->no  (it wont be consistant)
if we consolidate results of all partitions-->must be same
Given the maximum element in the given RDD
def f3(x,y):
if x>y:
  return x
else:
  return y 
a3=r1.reduce(lambda x,y:f3(x,y))
print(a3)

8
Given the minimum element in the given RDD
def f3(x,y):
if x>y
  return y
else
  return x 
a3=r1.reduce(lambda x,y:f3(x,y))
print(a3)
1

1,2--->1<2==>1
3,4--->3<4===>3
5,6,7,8->5<6--->7<8--->5<8-->5

1,3,5
1<3-->1<5-->1
reduce-->Action-->no shuffling
b1=r1.fold(2,lambda x,y:x+y)
print(b1)

44
b2=r1.fold(0,lambda x,y:x+y)
print(b2)
36

fold(num,fun)
fun-->lambda x,y:x+yes
1,2-->num+1-->2+1==>3+2==>5
3,4-->2+3-->5+4==>9
5,6,7,8-->2+5-->7+6-->13+7-->20+8-->28

2(initial value)+ 5,9,28==>44
rd1=spark.sparkContext.parallelize(["hyde","chn","delhi","pune","mumbai","bnglr","guwahati"],2)
rd2=rd1.mappartitions(lambda x:f1(x))
print(rd2.collect())

[['hyde', 'chn', 'delhi'], ['pune', 'mumbai', 'bnglr', 'guwahati']]

rd2=spark.sparkContext.parallelize([2000,12000,30000,25000,42000,10000],2)
print(rd2.collect())

rd32=rd2.fold(5000,lambda x,y:x+y)
print(rd32)

136000
[12000,30000]
fold(5000,f3(x,y)
5000
5000<12000-->5000
5000<30000-->5000
[25000,4000,10000]
5000
5000<25000-->5000
5000<4000-->4000
4000<10000-->4000
5000

print(r1.collect())
c1=r1.aggregate(2,(lambda x,y:x+y),(lambda x,y:x-y))

aggregate(zero values,f1,f2)
f1 will be applied on partitions data with 0 values
f2 will be applied on the results of the partitions
1,2-->2+1==>3+2==>5
3,4-->2+3-->5+4==>9
5,6,7,8->2+5-->7+6==>13+7==>20+8==>28
5,9,28==>2-5==>-3-9==>-12-28==>-40

reduce(f1)--f1 will be applied on partitions and result of partitions
fold(zv,f1)-->f1 will be applied on partitions and result of partitions with zero values
aggregate(zv,f1,f2)-->f1 will be applied on partitions with zv and f2 will be applied on the results of partitions with zv


No comments:

Post a Comment

Python Challenges Program

Challenges program: program 1: #Input :ABAABBCA #Output: A4B3C1 str1="ABAABBCA" str2="" d={} for x in str1: d[x]=d...