Interview Problem:
Rdd1=(2,112,acb),(10,112,vbg),(1,113,jik)
Rdd2=(1,111,vcx),(9,777,bvc)
I have two rdds I have to join them such a way that common 1 is removed from the rdd
Result=(2,112,abc),(10,112.vbg),(9,777,bvc)
Method 1:
scala> val rdd1=sc.makeRDD(Array((2,112,"acb"),(10,112,"vbg"),(1,113,"jik")))
rdd1: org.apache.spark.rdd.RDD[(Int, Int, String)] = ParallelCollectionRDD[37] at makeRDD at <console>:24
scala> val rdd2=sc.makeRDD(Array((1,111,"vcx"),(9,777,"bvc")))
rdd2: org.apache.spark.rdd.RDD[(Int, Int, String)] = ParallelCollectionRDD[38] at makeRDD at <console>:24
scala> val rdd1_map=rdd1.map(x=>
| (x._1,(x._2,x._3)))
rdd1_map: org.apache.spark.rdd.RDD[(Int, (Int, String))] = MapPartitionsRDD[39] at map at <console>:26
scala> val rdd2_map=rdd2.map(x=>
| (x._1,(x._2,x._3)))
rdd2_map: org.apache.spark.rdd.RDD[(Int, (Int, String))] = MapPartitionsRDD[44] at map at <console>:26
scala> val rdd_join=rdd1_map.fullOuterJoin(rdd2_map)
rdd_join: org.apache.spark.rdd.RDD[(Int, (Option[(Int, String)], Option[(Int, String)]))] = MapPartitionsRDD[47] at fullOuterJoin at <console>:32
scala> rdd_join.take(10).foreach(println)
(1,(Some((113,jik)),Some((111,vcx))))
(9,(None,Some((777,bvc))))
(10,(Some((112,vbg)),None))
(2,(Some((112,acb)),None))
scala> val rdd_res=rdd_join.filter(x=>(x._2._1 == None) || (x._2._2 == None))
rdd_res: org.apache.spark.rdd.RDD[(Int, (Option[(Int, String)], Option[(Int, String)]))] = MapPartitionsRDD[48] at filter at <console>:34
scala> rdd_res.take(10).foreach(println)
(9,(None,Some((777,bvc))))
(10,(Some((112,vbg)),None))
(2,(Some((112,acb)),None))
scala> rdd_final.take(5).foreach(println)
(9,Some((777,bvc)))
(10,Some((112,vbg)))
(2,Some((112,acb)))
scala>val rdd_final2=rdd_final.mapValues(x=>x.getOrElse(0))
scala> rdd_final2.take(5).foreach(println)
(9,(777,bvc))
(10,(112,vbg))
(2,(112,acb))
Another Method:
scala> val rdd_sub1=rdd1_map.subtractByKey(rdd2_map)
rdd_sub1: org.apache.spark.rdd.RDD[(Int, (Int, String))] = SubtractedRDD[84] at subtractByKey at <console>:32
scala> rdd_sub1.collect
res61: Array[(Int, (Int, String))] = Array((2,(112,acb)), (10,(112,vbg)))
scala> val rdd_sub2=rdd2_map.subtractByKey(rdd1_map)
rdd_sub2: org.apache.spark.rdd.RDD[(Int, (Int, String))] = SubtractedRDD[85] at subtractByKey at <console>:32
scala> rdd_sub2.collect
res62: Array[(Int, (Int, String))] = Array((9,(777,bvc)))
scala> val res=rdd_sub1.union(rdd_sub2)
res: org.apache.spark.rdd.RDD[(Int, (Int, String))] = PartitionerAwareUnionRDD[86] at union at <console>:36
scala> res.take(5).foreach(println)
(2,(112,acb))
(10,(112,vbg))
(9,(777,bvc))
No comments:
Post a Comment