Thursday, February 14, 2019

Spark full outer Join Interview question Scenorio


Interview Problem:

Rdd1=(2,112,acb),(10,112,vbg),(1,113,jik)
Rdd2=(1,111,vcx),(9,777,bvc)
I have two rdds I have to join them such a way that common 1 is removed from the rdd
Result=(2,112,abc),(10,112.vbg),(9,777,bvc)


Method 1:
scala> val rdd1=sc.makeRDD(Array((2,112,"acb"),(10,112,"vbg"),(1,113,"jik")))

rdd1: org.apache.spark.rdd.RDD[(Int, Int, String)] = ParallelCollectionRDD[37] at makeRDD at <console>:24

scala> val rdd2=sc.makeRDD(Array((1,111,"vcx"),(9,777,"bvc")))

rdd2: org.apache.spark.rdd.RDD[(Int, Int, String)] = ParallelCollectionRDD[38] at makeRDD at <console>:24

scala> val rdd1_map=rdd1.map(x=>
     | (x._1,(x._2,x._3)))
rdd1_map: org.apache.spark.rdd.RDD[(Int, (Int, String))] = MapPartitionsRDD[39] at map at <console>:26

scala> val rdd2_map=rdd2.map(x=>
     | (x._1,(x._2,x._3)))

rdd2_map: org.apache.spark.rdd.RDD[(Int, (Int, String))] = MapPartitionsRDD[44] at map at <console>:26

scala> val rdd_join=rdd1_map.fullOuterJoin(rdd2_map)

rdd_join: org.apache.spark.rdd.RDD[(Int, (Option[(Int, String)], Option[(Int, String)]))] = MapPartitionsRDD[47] at fullOuterJoin at <console>:32

scala> rdd_join.take(10).foreach(println)

(1,(Some((113,jik)),Some((111,vcx))))
(9,(None,Some((777,bvc))))
(10,(Some((112,vbg)),None))
(2,(Some((112,acb)),None))

scala> val rdd_res=rdd_join.filter(x=>(x._2._1 == None) || (x._2._2 == None))

rdd_res: org.apache.spark.rdd.RDD[(Int, (Option[(Int, String)], Option[(Int, String)]))] = MapPartitionsRDD[48] at filter at <console>:34

scala> rdd_res.take(10).foreach(println)

(9,(None,Some((777,bvc))))
(10,(Some((112,vbg)),None))
(2,(Some((112,acb)),None))

scala> rdd_final.take(5).foreach(println)

(9,Some((777,bvc)))
(10,Some((112,vbg)))
(2,Some((112,acb)))

scala>val rdd_final2=rdd_final.mapValues(x=>x.getOrElse(0))

scala> rdd_final2.take(5).foreach(println)

(9,(777,bvc))
(10,(112,vbg))
(2,(112,acb))

Another Method:
scala> val rdd_sub1=rdd1_map.subtractByKey(rdd2_map)

rdd_sub1: org.apache.spark.rdd.RDD[(Int, (Int, String))] = SubtractedRDD[84] at subtractByKey at <console>:32

scala> rdd_sub1.collect

res61: Array[(Int, (Int, String))] = Array((2,(112,acb)), (10,(112,vbg)))

scala> val rdd_sub2=rdd2_map.subtractByKey(rdd1_map)

rdd_sub2: org.apache.spark.rdd.RDD[(Int, (Int, String))] = SubtractedRDD[85] at subtractByKey at <console>:32

scala> rdd_sub2.collect

res62: Array[(Int, (Int, String))] = Array((9,(777,bvc)))

scala> val res=rdd_sub1.union(rdd_sub2)

res: org.apache.spark.rdd.RDD[(Int, (Int, String))] = PartitionerAwareUnionRDD[86] at union at <console>:36

scala> res.take(5).foreach(println)

(2,(112,acb))
(10,(112,vbg))
(9,(777,bvc))

No comments:

Post a Comment

Python Challenges Program

Challenges program: program 1: #Input :ABAABBCA #Output: A4B3C1 str1="ABAABBCA" str2="" d={} for x in str1: d[x]=d...