Friday, March 8, 2019

Pyspark RDD various operations


pairRDD-->keys,values,distinct,min,max,mapValues,flatMapValues
join-inner,leftouter,rightouter,fullouter
set operation->union,intersection,substraction,cartesian,subtractByKey,KeyBy
repartition vs coalesce
zip,zipWithIndex,glom

r1=spark.sparkContext.parallelize([('a',10),('b',5),('c',15),('d',12),('a',10),('b',30)])
r2=spark.sparkContext.parallelize([('a',50),('b',15),('d',15),('e',12),('c',10),('a',30)])

r3=spark.sparkContext.parallelize([20,30,40,50,60])
print(r3.max())
60
print(r3.min())
20

print(r3.mean())
40.0

print(r3.variance())
200.0

print(r1.collect())
[('a', 10), ('b', 5), ('c', 15), ('d', 12), ('a', 10), ('b', 30)]

print(r1.keys().collect())
['a', 'b', 'c', 'd', 'a', 'b']

>>> print(r1.keys().distinct().collect())
['a', 'c', 'b', 'd']

print(r1.values().collect())
[10, 5, 15, 12, 10, 30]

print(r1.values().distinct().collect())
[10, 12, 5, 30, 15]

#joins----involves shuffling of data--inner,left outer,right outer,full outer
j1=r1.join(r2)# by default inner join
print(j1.collect())

[('a', (10, 50)), ('a', (10, 30)), ('a', (10, 50)), ('a', (10, 30)), ('c', (15, 10)), ('b', (5, 15)), ('b', (30, 15)), ('d', (12, 15))]
j2=r1.leftOuterJoin(r2)
print(j2.collect())
[('a', (10, 50)), ('a', (10, 30)), ('a', (10, 50)), ('a', (10, 30)), ('c', (15, 10)), ('b', (5, 15)), ('b', (30, 15)), ('d', (12, 15))]

x1=spark.sparkContext.parallelize([(1,2,3),(2,3,4),(3,4,5),(4,5,6)])
x2=spark.sparkContext.parallelize([(10,20),(20,30),(30,40)])
y1=x1.join(x2)
print(y1.collect())
[]

d1=spark.sparkContext.parallelize([('a',(20,30,40,50)),('b',(1,2,3))])
a,20 a,30 a,40 a,50
print(d1.count())
d1.map(lambda x:x[0],x[1])).collect()
d1.flatMapValues(lambda x:x).collect()
[('a', 20), ('a', 30), ('a', 40), ('a', 50), ('b', 1), ('b', 2), ('b', 3)]

d1.map(lambda x:(x[0],len(x[1]))).collect()
[('a', 4), ('b', 3)]

d1.mapValues(lambda x:len(x)).collect()
[('a', 4), ('b', 3)]

d1=spark.sparkContext.parallelize([(1,2,40),(2,3),(3,4,20)])
d1.flatMap(lambda x:x).collect()

[1, 2, 40, 2, 3, 3, 4, 20]
d1=spark.sparkContext.parallelize([(101,'shankar','btech'),(102,'kumar','bsc')])
d2=d1.map(lambda x:(x[0],x[1],x[2]))
print(d2.collect())
[(101, 'shankar', 'btech'), (102, 'kumar', 'bsc')]

d2=d1.keyBy(lambda x:x[0])
print(d2.collect())

[(101, (101, 'shankar', 'btech')), (102, (102, 'kumar', 'bsc'))]

#set operations-union,intersection,subtraction,cartesisan
e1=spark.sparkContext.textFile("calllogdata")
print(e1.getNumPartitions())
print(d1.getNumPartitions())
create RDD from local file/directory--->partitions
spark uses hadoop library from reading and writing the data
spark(3 paritions)-->hadoop library(blocks)---LFS(32 MB each)

hadoop 1.x->64MB
hadoop 2.x-->128MB

spark-->storage system...how many partitions
HDFS-->blocks
LFS-->32MB

spark(no of paritions==no of input splits)
every input split is a blocks

not every block is an input split
success=e1.filter(lambda x:'SUCCESS' in x)
drop=e1.filter(lambda x:'DROPED' in x)
fail=e1.filter(lambda x:'FAILED' in x)
union=success.union(drop)
print(union.getNumPartitions())
inter=success.intersection(drop)
e2=e1.repartition(5)
print(e2.getNumPartitions())

repartitions(n)->to decrease or increase the no of paritions to n and this involves shuffling...
e3=e1.repartition(2)  #involves shuffling

print(e3.getNumPartitions())
c1=e2.coalesce(2)  #doesn't involves shuffling
print(c1.getNumPartitions())

f1    f2    f3
100  100   100

increasing(compulsary)
f1   f2     f3    f5   f5

60   60     60    60   60
decrease ---repartition,coalesce

f1  f2

150 150

rdd/df -->10 partitions
hdfs --->single file

rdd.repartition(1).saveAs(.....)
rdd.coalesce.saveAs(...)  #merges a single unit of data

union-->no shuffling
intersection->shuffling
subtract--shuffling
cross product--no shuffling


No comments:

Post a Comment

Python Challenges Program

Challenges program: program 1: #Input :ABAABBCA #Output: A4B3C1 str1="ABAABBCA" str2="" d={} for x in str1: d[x]=d...