Integrating Cassandra with Spark - Import / Export data between Spark and Cassandra
Cassandra's default port number : 9042
// Start Cassandra server
$ sudo service Cassandra start
// Verify Cassandra is up
$ netstat -ln | grep 9042
tcp 0 0*
// to start Cassandra Query Language
hadoop@hadoop:~$ cqlsh localhost
Connected to Test Cluster at localhost:9042.
[cqlsh 5.0.1 | Cassandra 3.11.4 | CQL spec 3.4.4 | Native protocol v4]
Use HELP for help.
Cassandra - Columnar Storage NoSQL
- hbase is also a columnar
Cassandra and Hbase are same family members
Hadoop :
Master, Slave Architecture
Name Node, Data Node
purely based on Hadoop
Master : HMaster
Slave : HRegion server
Cassandra : Peer To Peer Architecture
Nodes are logically connected as Circle
Every can interact with every other nodes
There is no Master, Slave things
(cassandra server daemon runs on each nodes)
Hive, MySQL - data stored in the form of Tables
Hive, MySQL, RDBMS: Database (Schema)-> Tables -> Rows -> Columns
KeySpace : Schema
KeySpace -> Tables
pure sql language - cql - cassandra query language
schema is known as Keyspace in Cassandra.
// show all schemas (databases)
cqlsh> describe keyspaces;
people system_schema system_auth system system_distributed system_traces
//SimpleStrategy means (Single DataCentre and Single Rack)
cqlsh> create schema test1 with replication = {'class':'SimpleStrategy','replication_factor':1};
// Must Include DataCentre here
//NetworkTopologyStrategy with DataCentre : Multiple DataCentre and Multiple Racks
cqlsh> create keyspace if not exists test2 with replication = {'class':'NetworkTopologyStrategy','datacentre':1};
cqlsh> describe keyspaces;
test1 system_schema system system_traces
people system_auth system_distributed
CREATE KEYSPACE test1 WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'} AND durable_writes = true;
test1 people system_auth system_distributed
test2 system_schema system system_traces
// while creating cassandra table, primary key must be included - primary key is mandatory
cqlsh> create table test1.employee(id int primary key, name text, salary int, dept text);
cqlsh> describe test1.employee;
CREATE TABLE test1.employee (
dept text,
name text,
salary int
) WITH bloom_filter_fp_chance = 0.01
AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
AND comment = ''
AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
AND compression = {'chunk_length_in_kb': '64', 'class': ''}
AND crc_check_chance = 1.0
AND dclocal_read_repair_chance = 0.1
AND default_time_to_live = 0
AND gc_grace_seconds = 864000
AND max_index_interval = 2048
AND memtable_flush_period_in_ms = 0
AND min_index_interval = 128
AND read_repair_chance = 0.0
AND speculative_retry = '99PERCENTILE';
cqlsh> insert into test1.employee(id,name,salary,dept) values (101,'siva',3000,'bigdata');
cqlsh> insert into test1.employee(id,name,salary,dept) values (102,'sakthi',3500,'spark');
cqlsh> insert into test1.employee(id,name,salary,dept) values (103,'prakash',3600,'Java');
cqlsh> select * from test1.employee;
id | dept | name | salary
102 | spark | sakthi | 3500
101 | bigdata | siva | 3000
103 | Java | prakash | 3600
cqlsh> create table test1.student(id int primary key, name text, course text, age int);
cqlsh> insert into test1.student(id,name,course,age) values (200,'Sanmugh','Spark',25);
cqlsh> insert into test1.student(id,name,age,course) values (201,'David',22,'Cassandra');
cqlsh> insert into test1.student(name,id,age,course) values ('stella',203,33,'Kafka');
cqlsh> insert into test1.student(name,id,age) values ('John',204,22);
cqlsh> describe test1;
CREATE KEYSPACE test1 WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'} AND durable_writes = true;
CREATE TABLE test1.employee (
dept text,
name text,
salary int
) WITH bloom_filter_fp_chance = 0.01
AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
AND comment = ''
AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
AND compression = {'chunk_length_in_kb': '64', 'class': ''}
AND crc_check_chance = 1.0
AND dclocal_read_repair_chance = 0.1
AND default_time_to_live = 0
AND gc_grace_seconds = 864000
AND max_index_interval = 2048
AND memtable_flush_period_in_ms = 0
AND min_index_interval = 128
AND read_repair_chance = 0.0
AND speculative_retry = '99PERCENTILE';
CREATE TABLE test1.student (
age int,
course text,
name text
) WITH bloom_filter_fp_chance = 0.01
AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
AND comment = ''
AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
AND compression = {'chunk_length_in_kb': '64', 'class': ''}
AND crc_check_chance = 1.0
AND dclocal_read_repair_chance = 0.1
AND default_time_to_live = 0
AND gc_grace_seconds = 864000
AND max_index_interval = 2048
AND memtable_flush_period_in_ms = 0
AND min_index_interval = 128
AND read_repair_chance = 0.0
AND speculative_retry = '99PERCENTILE';
cqlsh> select * from test1.student;
id | age | course | name
201 | 22 | Cassandra | David
204 | 22 | null | John
203 | 33 | Kafka | stella
200 | 25 | Spark | Sanmugh
cqlsh> insert into test1.student(id) values (202);
cqlsh> select * from test1.student;
id | age | course | name
201 | 22 | Cassandra | David
204 | 22 | null | John
203 | 33 | Kafka | stella
200 | 25 | Spark | Sanmugh
202 | null | null | null
Start spark in one more terminal:
$ spark-shell --packages com.datastax.spark:spark-cassandra-connector_2.11:2.4.0 --conf
scala> import com.datastax.spark.connector._
import com.datastax.spark.connector._
scala> val r1 = sc.cassandraTable("test1","employee")
r1: com.datastax.spark.connector.rdd.CassandraTableScanRDD[com.datastax.spark.connector.CassandraRow] = CassandraTableScanRDD[0] at RDD at CassandraRDD.scala:19
scala> r1.collect.foreach(println)
CassandraRow{id: 102, dept: spark, name: sakthi, salary: 3500}
CassandraRow{id: 101, dept: bigdata, name: siva, salary: 3000}
CassandraRow{id: 103, dept: Java, name: prakash, salary: 3600}
scala> val r2 = sc.cassandraTable("test1","student")
r2: com.datastax.spark.connector.rdd.CassandraTableScanRDD[com.datastax.spark.connector.CassandraRow] = CassandraTableScanRDD[1] at RDD at CassandraRDD.scala:19
scala> r2.collect.foreach(println)
CassandraRow{id: 202, age: null, course: null, name: null}
CassandraRow{id: 203, age: 33, course: Kafka, name: stella}
CassandraRow{id: 200, age: 25, course: Spark, name: Sanmugh}
CassandraRow{id: 201, age: 22, course: Cassandra, name: David}
CassandraRow{id: 204, age: 22, course: null, name: John}
//Without using Case Class:
// Adding schema to the RDD just mention the data types only.
scala> val r1 = sc.cassandraTable[(Int,String,String,Int)]("test1","employee")
r1: com.datastax.spark.connector.rdd.CassandraTableScanRDD[(Int, String, String, Int)] = CassandraTableScanRDD[2] at RDD at CassandraRDD.scala:19
// Now it is Tuple Here
scala> r1.collect.foreach(println)
// Converting RDD into Dataframe
scala> val df1 = r1.toDF("id","dept","name","salary");
df1: org.apache.spark.sql.DataFrame = [id: int, dept: string ... 2 more fields]
2019-02-25 12:28:56 WARN ReplicationStrategy$NetworkTopologyStrategy:200 - Error while computing token map for keyspace test2 with datacenter datacentre: could not achieve replication factor 1 (found 0 replicas only), check your keyspace replication settings.
| id| dept| name|salary|
|102| spark| sakthi| 3500 |
|101|bigdata| siva| 3000 |
|103| Java|prakash| 3600 |
//With Using Case Class
scala> case class Emp(id:Int, Dept:String, Name:String, Salary:Int)
defined class Emp
scala> val r1 = sc.cassandraTable[Emp]("test1","employee")
r1: com.datastax.spark.connector.rdd.CassandraTableScanRDD[Emp] = CassandraTableScanRDD[8] at RDD at CassandraRDD.scala:19
// show the records as tuple
scala> r1.collect.foreach(println)
//Making Dataframe from RDD
scala> val df = r1.toDF();
df: org.apache.spark.sql.DataFrame = [id: int, Dept: string ... 2 more fields]
| id| Dept| Name|Salary|
|102| spark| sakthi| 3500|
|101|bigdata| siva| 3000|
|103| Java|prakash| 3600|
// Before applying schema
scala> val r = sc.cassandraTable("test1","student")
r: com.datastax.spark.connector.rdd.CassandraTableScanRDD[com.datastax.spark.connector.CassandraRow] = CassandraTableScanRDD[13] at RDD at CassandraRDD.scala:19
scala> r.collect.foreach(println)
2019-02-25 12:38:03 WARN ReplicationStrategy$NetworkTopologyStrategy:200 - Error while computing token map for keyspace test2 with datacenter datacentre: could not achieve replication factor 1 (found 0 replicas only), check your keyspace replication settings.
CassandraRow{id: 202, age: null, course: null, name: null}
CassandraRow{id: 203, age: 33, course: Kafka, name: stella}
CassandraRow{id: 200, age: 25, course: Spark, name: Sanmugh}
CassandraRow{id: 201, age: 22, course: Cassandra, name: David}
CassandraRow{id: 204, age: 22, course: null, name: John}
// Applying Schema here
scala> val r = sc.cassandraTable[(Int,Int,String,String)]("test1","student")
r: com.datastax.spark.connector.rdd.CassandraTableScanRDD[(Int, Int, String, String)] = CassandraTableScanRDD[14] at RDD at CassandraRDD.scala:19
// We have null values in our data, so we will get exception here
scala> r.collect.foreach(println)
com.datastax.spark.connector.types.TypeConversionException: Failed to convert column age of test1.student to Int: null
//If Non null value is there in table it will work. If Null value is there, it wont work
//Int cannot bring data from null
//We have a record which has except id all the columns are null
//Here we applied Option[DataType] for necessary column to avoid exception
scala> val r = sc.cassandraTable[(Option[Int],Option[Int],Option[String],Option[String])]("test1","student")
r: com.datastax.spark.connector.rdd.CassandraTableScanRDD[(Option[Int], Option[Int], Option[String], Option[String])] = CassandraTableScanRDD[15] at RDD at CassandraRDD.scala:19
// null value will be displayed as None
scala> r.collect.foreach(println)
2019-02-25 12:42:40 WARN ReplicationStrategy$NetworkTopologyStrategy:200 - Error while computing token map for keyspace test2 with datacenter datacentre: could not achieve replication factor 1 (found 0 replicas only), check your keyspace replication settings.
// converting RDD to Dataframe with column headers
scala> val df = r.toDF("id","age","course","name")
df: org.apache.spark.sql.DataFrame = [id: int, age: int ... 2 more fields]
//Null values are displayed here
| id| age|course | name|
|202|null| null| null|
|203| 33| Kafka| stella|
|200| 25| Spark|Sanmugh|
|201| 22| Cassandra| David|
|204| 22| null| John|
// Replacing nulls in course column with Bigdata
scala> val df1 ="Bigdata",Array("course"))
df1: org.apache.spark.sql.DataFrame = [id: int, age: int ... 2 more fields]
| id| age| course| name|
|202|null| Bigdata| null|
|203| 33| Kafka| stella|
|200| 25| Spark|Sanmugh|
|201| 22|Cassandra| David|
|204| 22| Bigdata| John|
//We didnt mention column names, so what ever string fields has null will be replaced with 'Bigdata'
| id| age| course| name|
|202|null| Bigdata|Bigdata| // Here Name is Bigdata -- wrong approach
|203| 33| Kafka| stella|
|200| 25| Spark|Sanmugh|
|201| 22|Cassandra| David|
|204| 22| Bigdata| John|
// We didnt specify column names, so whatever numeric fields which has null values will be replaced with 100
2019-02-25 12:52:11 WARN ReplicationStrategy$NetworkTopologyStrategy:200 - Error while computing token map for keyspace test2 with datacenter datacentre: could not achieve replication factor 1 (found 0 replicas only), check your keyspace replication settings.
| id|age| course| name|
|202|100| Bigdata| null| // Here age is 100 - wrong data
|203| 33| Kafka| stella|
|200| 25| Spark|Sanmugh|
|201| 22|Cassandra| David|
|204| 22| Bigdata| John|
// it will drop which ever rows has null in whatever columns
| id|age| course| name|
|203| 33| Kafka| stella|
|200| 25| Spark|Sanmugh|
|201| 22|Cassandra| David|
// Here we are going to Export inmemory RDD content into Cassandra
//Here we create some in memory collection objects
Start spark in one more terminal:
$ spark-shell --packages com.datastax.spark:spark-cassandra-connector_2.11:2.4.0 --conf
scala> import com.datastax.spark.connector._
import com.datastax.spark.connector._
scala> case class Emp(id:Int, Dept:String,Name:String, Salary:Int)
defined class Emp
scala> val ob1 = new Emp(121,"accounts","Hari",5000);
ob1: Emp = Emp(121,accounts,Hari,5000)
scala> val ob2 = new Emp(122,"HR","Rani",6000);
ob2: Emp = Emp(122,HR,Rani,6000)
scala> val ob3 = new Emp(123,"Marketing","Suresh",6500);
ob3: Emp = Emp(123,Marketing,Suresh,6500)
scala> val r1 = sc.makeRDD(Seq(ob1,ob2,ob3));
r1: org.apache.spark.rdd.RDD[Emp] = ParallelCollectionRDD[40] at makeRDD at <console>:33
scala> r1.collect.foreach(println);
// Export RDD content into Cassandra
scala> r1.saveToCassandra("test1","employee");
// verify the newly inserted rows in Cassandra
cqlsh> select * from test1.employee;
id | dept | name | salary
123 | Marketing | Suresh | 6500 // newly inserted from spark to Cassandra
122 | HR | Rani | 6000 // newly inserted from spark to Cassandra
121 | accounts | Hari | 5000 // newly inserted from spark to Cassandra
102 | spark | sakthi | 3500
101 | bigdata | siva | 3000
103 | Java | prakash | 3600
