Export Spark RDD into Cassandra Table
// Here we are going to Export inmemory RDD content into Cassandra
//Here we create some in memory collection objects
Start spark in one more terminal:
----------------------------------
$ spark-shell --packages com.datastax.spark:spark-cassandra-connector_2.11:2.4.0 --conf spark.cassandra.connection.host=localhost
scala> import com.datastax.spark.connector._
import com.datastax.spark.connector._
scala> case class Emp(id:Int, Dept:String,Name:String, Salary:Int)
defined class Emp
scala> val ob1 = new Emp(121,"accounts","Hari",5000);
ob1: Emp = Emp(121,accounts,Hari,5000)
scala> val ob2 = new Emp(122,"HR","Rani",6000);
ob2: Emp = Emp(122,HR,Rani,6000)
scala> val ob3 = new Emp(123,"Marketing","Suresh",6500);
ob3: Emp = Emp(123,Marketing,Suresh,6500)
scala> val r1 = sc.makeRDD(Seq(ob1,ob2,ob3));
r1: org.apache.spark.rdd.RDD[Emp] = ParallelCollectionRDD[40] at makeRDD at <console>:33
scala> r1.collect.foreach(println);
Emp(121,accounts,Hari,5000)
Emp(122,HR,Rani,6000)
Emp(123,Marketing,Suresh,6500)
// Export RDD content into Cassandra
scala> r1.saveToCassandra("test1","employee");
// verify the newly inserted rows in Cassandra
cqlsh> select * from test1.employee;
-----+-----------+---------+--------
id | dept | name | salary
-----+-----------+---------+--------
123 | Marketing | Suresh | 6500 // newly inserted from spark to Cassandra
122 | HR | Rani | 6000 // newly inserted from spark to Cassandra
121 | accounts | Hari | 5000 // newly inserted from spark to Cassandra
102 | spark | sakthi | 3500
101 | bigdata | siva | 3000
103 | Java | prakash | 3600
//Here we create some in memory collection objects
Start spark in one more terminal:
----------------------------------
$ spark-shell --packages com.datastax.spark:spark-cassandra-connector_2.11:2.4.0 --conf spark.cassandra.connection.host=localhost
scala> import com.datastax.spark.connector._
import com.datastax.spark.connector._
scala> case class Emp(id:Int, Dept:String,Name:String, Salary:Int)
defined class Emp
scala> val ob1 = new Emp(121,"accounts","Hari",5000);
ob1: Emp = Emp(121,accounts,Hari,5000)
scala> val ob2 = new Emp(122,"HR","Rani",6000);
ob2: Emp = Emp(122,HR,Rani,6000)
scala> val ob3 = new Emp(123,"Marketing","Suresh",6500);
ob3: Emp = Emp(123,Marketing,Suresh,6500)
scala> val r1 = sc.makeRDD(Seq(ob1,ob2,ob3));
r1: org.apache.spark.rdd.RDD[Emp] = ParallelCollectionRDD[40] at makeRDD at <console>:33
scala> r1.collect.foreach(println);
Emp(121,accounts,Hari,5000)
Emp(122,HR,Rani,6000)
Emp(123,Marketing,Suresh,6500)
// Export RDD content into Cassandra
scala> r1.saveToCassandra("test1","employee");
// verify the newly inserted rows in Cassandra
cqlsh> select * from test1.employee;
-----+-----------+---------+--------
id | dept | name | salary
-----+-----------+---------+--------
123 | Marketing | Suresh | 6500 // newly inserted from spark to Cassandra
122 | HR | Rani | 6000 // newly inserted from spark to Cassandra
121 | accounts | Hari | 5000 // newly inserted from spark to Cassandra
102 | spark | sakthi | 3500
101 | bigdata | siva | 3000
103 | Java | prakash | 3600
No comments:
Post a Comment