Monday, February 25, 2019

Export Spark RDD into Cassandra Table

Export Spark RDD into Cassandra Table



// Here we are going to Export inmemory RDD content into Cassandra


//Here we create some in memory collection objects

Start spark in one more terminal:
----------------------------------

$  spark-shell --packages com.datastax.spark:spark-cassandra-connector_2.11:2.4.0 --conf spark.cassandra.connection.host=localhost

scala> import com.datastax.spark.connector._
import com.datastax.spark.connector._

scala> case class Emp(id:Int, Dept:String,Name:String, Salary:Int)
defined class Emp

scala> val ob1 = new Emp(121,"accounts","Hari",5000);
ob1: Emp = Emp(121,accounts,Hari,5000)

scala> val ob2 = new Emp(122,"HR","Rani",6000);
ob2: Emp = Emp(122,HR,Rani,6000)

scala> val ob3 = new Emp(123,"Marketing","Suresh",6500);
ob3: Emp = Emp(123,Marketing,Suresh,6500)

scala> val r1 = sc.makeRDD(Seq(ob1,ob2,ob3));
r1: org.apache.spark.rdd.RDD[Emp] = ParallelCollectionRDD[40] at makeRDD at <console>:33

scala> r1.collect.foreach(println);
Emp(121,accounts,Hari,5000)
Emp(122,HR,Rani,6000)
Emp(123,Marketing,Suresh,6500)

// Export RDD content into Cassandra
scala> r1.saveToCassandra("test1","employee");

// verify the newly inserted rows in Cassandra
cqlsh> select * from test1.employee;
-----+-----------+---------+--------
 id  | dept      | name    | salary
-----+-----------+---------+--------
 123 | Marketing |  Suresh |   6500 // newly inserted from spark to Cassandra
 122 |        HR |    Rani |   6000 // newly inserted from spark to Cassandra
 121 |  accounts |    Hari |   5000 // newly inserted from spark to Cassandra
 102 |     spark |  sakthi |   3500
 101 |   bigdata |    siva |   3000
 103 |      Java | prakash |   3600


No comments:

Post a Comment

Python Challenges Program

Challenges program: program 1: #Input :ABAABBCA #Output: A4B3C1 str1="ABAABBCA" str2="" d={} for x in str1: d[x]=d...