pairRDD-->keys,values,distinct,min,max,mapValues,flatMapValues
join-inner,leftouter,rightouter,fullouterset operation->union,intersection,substraction,cartesian,subtractByKey,KeyBy
repartition vs coalesce
zip,zipWithIndex,glom
r1=spark.sparkContext.parallelize([('a',10),('b',5),('c',15),('d',12),('a',10),('b',30)])
r2=spark.sparkContext.parallelize([('a',50),('b',15),('d',15),('e',12),('c',10),('a',30)])
r3=spark.sparkContext.parallelize([20,30,40,50,60])
print(r3.max())60
print(r3.min())
20
print(r3.mean())
40.0
print(r3.variance())
200.0
print(r1.collect())
[('a', 10), ('b', 5), ('c', 15), ('d', 12), ('a', 10), ('b',
30)]
print(r1.keys().collect())
['a', 'b', 'c', 'd', 'a', 'b']
>>> print(r1.keys().distinct().collect())
['a', 'c', 'b', 'd']
print(r1.values().collect())
[10, 5, 15, 12, 10, 30]
print(r1.values().distinct().collect())
[10, 12, 5, 30, 15]
#joins----involves shuffling of data--inner,left outer,right
outer,full outer
j1=r1.join(r2)# by default inner joinprint(j1.collect())
[('a', (10, 50)), ('a', (10, 30)), ('a', (10, 50)), ('a',
(10, 30)), ('c', (15, 10)), ('b', (5, 15)), ('b', (30, 15)), ('d', (12, 15))]
j2=r1.leftOuterJoin(r2)print(j2.collect())
[('a', (10, 50)), ('a', (10, 30)), ('a', (10, 50)), ('a', (10, 30)), ('c', (15, 10)), ('b', (5, 15)), ('b', (30, 15)), ('d', (12, 15))]
x1=spark.sparkContext.parallelize([(1,2,3),(2,3,4),(3,4,5),(4,5,6)])
x2=spark.sparkContext.parallelize([(10,20),(20,30),(30,40)])y1=x1.join(x2)
print(y1.collect())
[]
d1=spark.sparkContext.parallelize([('a',(20,30,40,50)),('b',(1,2,3))])
a,20 a,30 a,40 a,50print(d1.count())
d1.map(lambda x:x[0],x[1])).collect()
d1.flatMapValues(lambda x:x).collect()
[('a', 20), ('a', 30), ('a', 40), ('a', 50), ('b', 1), ('b', 2), ('b', 3)]
d1.map(lambda x:(x[0],len(x[1]))).collect()
[('a', 4), ('b', 3)]
d1.mapValues(lambda x:len(x)).collect()
[('a', 4), ('b', 3)]
d1=spark.sparkContext.parallelize([(1,2,40),(2,3),(3,4,20)])
d1.flatMap(lambda x:x).collect()
[1, 2, 40, 2, 3, 3, 4, 20]
d1=spark.sparkContext.parallelize([(101,'shankar','btech'),(102,'kumar','bsc')])d2=d1.map(lambda x:(x[0],x[1],x[2]))
print(d2.collect())
[(101, 'shankar', 'btech'), (102, 'kumar', 'bsc')]
d2=d1.keyBy(lambda x:x[0])
print(d2.collect())
[(101, (101, 'shankar', 'btech')), (102, (102, 'kumar',
'bsc'))]
#set operations-union,intersection,subtraction,cartesisan
e1=spark.sparkContext.textFile("calllogdata")print(e1.getNumPartitions())
print(d1.getNumPartitions())
create RDD from local file/directory--->partitions
spark uses hadoop library from reading and writing the data
spark(3 paritions)-->hadoop library(blocks)---LFS(32 MB each)
hadoop 1.x->64MB
hadoop 2.x-->128MB
spark-->storage system...how many partitions
HDFS-->blocksLFS-->32MB
spark(no of paritions==no of input splits)
every input split is a blocks
not every block is an input split
success=e1.filter(lambda x:'SUCCESS' in x)drop=e1.filter(lambda x:'DROPED' in x)
fail=e1.filter(lambda x:'FAILED' in x)
union=success.union(drop)
print(union.getNumPartitions())
inter=success.intersection(drop)
e2=e1.repartition(5)
print(e2.getNumPartitions())
repartitions(n)->to decrease or increase the no of
paritions to n and this involves shuffling...
e3=e1.repartition(2)
#involves shuffling
print(e3.getNumPartitions())
c1=e2.coalesce(2)
#doesn't involves shufflingprint(c1.getNumPartitions())
f1 f2 f3
100 100 100
increasing(compulsary)
f1 f2 f3
f5 f5
60 60 60
60 60
decrease ---repartition,coalesce
f1 f2
150 150
rdd/df -->10 partitions
hdfs --->single file
rdd.repartition(1).saveAs(.....)
rdd.coalesce.saveAs(...)
#merges a single unit of dataunion-->no shuffling
intersection->shuffling
subtract--shuffling
cross product--no shuffling
No comments:
Post a Comment