Friday, March 8, 2019

Pyspark Pair RDD Aggregation operations

Pyspark Pair RDD Aggregation operations

Ambari(HW)--->clouderamanager(Cloudera)
entire information of the cluster
----------------------------------------
combinebyKey()
groupByKey()
reduceByKey()
folByKey()
aggregateByKey()
cloudera,Hortonworks,mapR,IBM insights
AWS--cloud services(provide h/w and s/w, one machine,access gives to remoted manner)
GCP
Microsoft Azure
I need a cluster of bigdata machine(linux)/hadoop,hive,spark,pig,java,python,linux
from pyspark.sql import SparkSession
spark=SparkSession.builder.master('local').appName('aggrpaiRDD').getOrCreate()
spark
l1=[('a',10),('a',15),('b',10),('b',20),('c',30),('a',30),('a',20),('b',10),
('c',15),('a',25),('b',30),('c',25)]
r1=spark.sparContext.parallelize(l1,3)
print(r1.collect())
print(r1.getNumpartitions())
def f1(x):yield list(x)
r2=r1.mapPartitions(lambda x:f1(x))
print(r2.collect())
#pairRDD,reduceByKey(f1) f1 will be applied on the values
res1=r1.reduceByKey(lambda x,y:x+y)
print(res1.collect())
Another same approach as well as above one:
def f1(x,y):
res1=r1.reduceByKey(lambda x,y:f1(x,y))
print(res1.collect())
reduceByKey()
('a',10),('a',15),('b',10),('b',20),---(a,(10,15)),b(10,20)-->(a,25) (b,30)
('c',30),('a',30),('a',20),('b',10),---(c,30),(a,(30,20),(b,10)-->c(30),(b,10),(a,50)
('c',15),('a',25),('b',30),('c',25)]--(c,(15,25)),(a,25),(b,30)-->(c,40)(a,25)(b,30)
p1---(a,25) (b,30)
p2---c(30),(b,10),(a,50)
p3---(c,40)(a,25)(b,30)
(a,(25,50,25)-->(a,100)
(b,(30,10,30)--->(b,70)
(c,(30,40)--->(c,70)
#pairRDD.foldByKey(zv,f1) f1 will be applied with in the partitions and across the partitions
and zero values also included in the list of grouped values
res2=r1.foldByKey(2,lambda x,y:f1(x,y))
print(res2.collect())
foldByKey()
('a',10),('a',15),('b',10),('b',20),---(a,(2,10,15),(b,2,10,20)
('c',30),('a',30),('a',20),('b',10),---(c,(2,30)),(a,(2,30,20)),(b,(2,10)
('c',15),('a',25),('b',30),('c',25)]--(c,(2,15,25)),(a,(2,25)),(b,(2,30))
p1-->(a,27),(b,32)---->(a,(27,52,27))-->(a,106)
p2-->(c,32),(a,52),(b,12)-->(b,(32,12,32))-->(b,76)
p3-->(c,42),(a,27),b(32)-->(c,(32,42))-->(c,74)
#pairRDD.aggregateByKey(zv,f1,f2)-->f1 will be applied on the grouped values
with in the partitions and f2 will be applied across the partitions
def f2(x,y):
               return x-y
res3=r1.aggregateByKey(2,lambda x,y:f1(x,y),lambda x,y:f2(x,y))
print(res3.collect())
AggregateByKey()
within the partitions
('a',10),('a',15),('b',10),('b',20),---(a,(1,1),(10,15),(b,(1,1)(10,20))-->(a,(2,25)),(b,(2,30))
('c',30),('a',30),('a',20),('b',10),---(c,(1),(30),(a,(1,1),(30,20)),(b,(1),(10))-->(c,(1,20)),(a,(2,50)),(b,(1,10))
('c',15),('a',25),('b',30),('c',25)]--(c,(1,1),(15,25)),(a,(1),(25)),(b,(1),(30))-->(c,2,40),(a,(1,25)),(b,(1,30))
Across the partitions
(a,(2,25),(2,50),(1,25))--->(a,(2+2+1,25+50+25)-->(a,(5,100))
(b,(2,30),(1,10),(1,30))--->(b,(2+1+1,30+10+30))-->(b,(4,70)
(c,(1,30),(2,40))-->c(1+2,30+40)000>(c,(3,70))
#pairRDD.combineByKey(f1,f2,f3)
#f1 creating a pair out of a value(generation of zv from a function))
#f2 will be applied with in the partitions
#f3 will be applied across the partitions
r3=r1.map(lambda x:(x[0],(float)(x[1])))

def fn1(x):
   return(1,x)
def fn2(x,y): # x is tuple and y is values
   return (x[0]+1,x[1]+y)
def fn3(x,y): both x and y are tuples
       return(x[0]+y[0],x[1]+y[1])          
res4=r3,combineByKey(fn1,fn2,fn3)
print(res4.collect())                              
res5=res4.map(lambda x:(x[0],x[1][1]/x[1][0])
print(res5.collect())                       
RDD/DF-->trans and actions
persistance,resilience,lineage,stages,tasks
cluster
executor
driver
spark-submitt       
---------------------------------
python programming
conditional,loops
                 

No comments:

Post a Comment

Python Challenges Program

Challenges program: program 1: #Input :ABAABBCA #Output: A4B3C1 str1="ABAABBCA" str2="" d={} for x in str1: d[x]=d...