Monday, February 25, 2019

Integrating Cassandra with Spark - Import / Export data between Spark and Cassandra

Integrating Cassandra with Spark - Import / Export data between Spark and Cassandra
Cassandra's default port number : 9042

// Start Cassandra server

$ sudo service Cassandra start

// Verify Cassandra is up

$ netstat -ln | grep 9042
tcp        0      0 127.0.0.1:9042          0.0.0.0:* 

// to start Cassandra Query Language

hadoop@hadoop:~$ cqlsh localhost

Connected to Test Cluster at localhost:9042.
[cqlsh 5.0.1 | Cassandra 3.11.4 | CQL spec 3.4.4 | Native protocol v4]
Use HELP for help.


Cassandra - Columnar Storage NoSQL
    - hbase is also a columnar
Cassandra and Hbase are same family members

Hadoop :
Master, Slave Architecture
    Name Node, Data Node

HBase:
purely based on Hadoop
  Master : HMaster
  Slave : HRegion server
 
Cassandra : Peer To Peer Architecture
Nodes are logically connected as Circle

Every can interact with every other nodes
There is no Master, Slave things
(cassandra server daemon runs on each nodes)

Hive, MySQL - data stored in the form of Tables
Hive, MySQL, RDBMS: Database (Schema)-> Tables -> Rows -> Columns

KeySpace : Schema
KeySpace -> Tables

pure sql language - cql - cassandra query language

schema is known as Keyspace in Cassandra.

// show all schemas (databases)
cqlsh> describe keyspaces;

people  system_schema  system_auth  system  system_distributed  system_traces

//SimpleStrategy means (Single DataCentre and Single Rack)
cqlsh> create schema test1 with replication = {'class':'SimpleStrategy','replication_factor':1};

// Must Include DataCentre here
//NetworkTopologyStrategy with DataCentre : Multiple DataCentre and Multiple Racks

cqlsh> create keyspace if not exists test2 with replication = {'class':'NetworkTopologyStrategy','datacentre':1};

cqlsh> describe keyspaces;

test1   system_schema  system              system_traces
people  system_auth    system_distributed

CREATE KEYSPACE test1 WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}  AND durable_writes = true;

test1  people         system_auth  system_distributed
test2  system_schema  system       system_traces 


// while creating cassandra table, primary key must be included - primary key is mandatory

 cqlsh> create table test1.employee(id int primary key, name text, salary int, dept text);

cqlsh> describe test1.employee;

CREATE TABLE test1.employee (
    id int PRIMARY KEY,
    dept text,
    name text,
    salary int
) WITH bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
    AND comment = ''
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
    AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND crc_check_chance = 1.0
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99PERCENTILE';


cqlsh> insert into test1.employee(id,name,salary,dept) values (101,'siva',3000,'bigdata');

cqlsh> insert into test1.employee(id,name,salary,dept) values (102,'sakthi',3500,'spark');

cqlsh> insert into test1.employee(id,name,salary,dept) values (103,'prakash',3600,'Java');

cqlsh> select * from test1.employee;

 id  | dept    | name    | salary
-----+---------+---------+--------
 102 |   spark |  sakthi |   3500
 101 | bigdata |    siva |   3000
 103 |    Java | prakash |   3600


cqlsh> create table test1.student(id int primary key, name text, course text, age int);
cqlsh> insert into test1.student(id,name,course,age) values (200,'Sanmugh','Spark',25);
cqlsh> insert into test1.student(id,name,age,course) values (201,'David',22,'Cassandra');
cqlsh> insert into test1.student(name,id,age,course) values ('stella',203,33,'Kafka');
cqlsh> insert into test1.student(name,id,age) values ('John',204,22);

cqlsh> describe test1;

CREATE KEYSPACE test1 WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}  AND durable_writes = true;

CREATE TABLE test1.employee (
    id int PRIMARY KEY,
    dept text,
    name text,
    salary int
) WITH bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
    AND comment = ''
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
    AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND crc_check_chance = 1.0
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99PERCENTILE';

CREATE TABLE test1.student (
    id int PRIMARY KEY,
    age int,
    course text,
    name text
) WITH bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
    AND comment = ''
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
    AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND crc_check_chance = 1.0
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99PERCENTILE';


cqlsh> select * from test1.student;

 id  | age | course    | name
-----+-----+-----------+---------
 201 |  22 | Cassandra |   David
 204 |  22 |      null |    John
 203 |  33 |     Kafka |  stella
 200 |  25 |     Spark | Sanmugh


cqlsh> insert into test1.student(id) values (202);
cqlsh> select * from test1.student;

 id  | age  | course    | name
-----+------+-----------+---------
 201 |   22 | Cassandra |   David
 204 |   22 |      null |    John
 203 |   33 |     Kafka |  stella
 200 |   25 |     Spark | Sanmugh
 202 | null |      null |    null

Start spark in one more terminal:
----------------------------------
$  spark-shell --packages com.datastax.spark:spark-cassandra-connector_2.11:2.4.0 --conf spark.cassandra.connection.host=localhost

scala> import com.datastax.spark.connector._
import com.datastax.spark.connector._


scala> val r1 = sc.cassandraTable("test1","employee")
r1: com.datastax.spark.connector.rdd.CassandraTableScanRDD[com.datastax.spark.connector.CassandraRow] = CassandraTableScanRDD[0] at RDD at CassandraRDD.scala:19

scala> r1.collect.foreach(println)
CassandraRow{id: 102, dept: spark, name: sakthi, salary: 3500}               
CassandraRow{id: 101, dept: bigdata, name: siva, salary: 3000}
CassandraRow{id: 103, dept: Java, name: prakash, salary: 3600}

scala> val r2 = sc.cassandraTable("test1","student")
r2: com.datastax.spark.connector.rdd.CassandraTableScanRDD[com.datastax.spark.connector.CassandraRow] = CassandraTableScanRDD[1] at RDD at CassandraRDD.scala:19

scala> r2.collect.foreach(println)
CassandraRow{id: 202, age: null, course: null, name: null}                   
CassandraRow{id: 203, age: 33, course: Kafka, name: stella}
CassandraRow{id: 200, age: 25, course: Spark, name: Sanmugh}
CassandraRow{id: 201, age: 22, course: Cassandra, name: David}
CassandraRow{id: 204, age: 22, course: null, name: John}

//Without using Case Class:
// Adding schema to the RDD  just mention the data types only.
scala> val r1 = sc.cassandraTable[(Int,String,String,Int)]("test1","employee")
r1: com.datastax.spark.connector.rdd.CassandraTableScanRDD[(Int, String, String, Int)] = CassandraTableScanRDD[2] at RDD at CassandraRDD.scala:19

// Now it is Tuple Here
scala> r1.collect.foreach(println)
(102,spark,sakthi,3500)                                                       
(101,bigdata,siva,3000)
(103,Java,prakash,3600)

// Converting RDD into Dataframe
scala> val df1 = r1.toDF("id","dept","name","salary");
df1: org.apache.spark.sql.DataFrame = [id: int, dept: string ... 2 more fields]

scala> df1.show
2019-02-25 12:28:56 WARN  ReplicationStrategy$NetworkTopologyStrategy:200 - Error while computing token map for keyspace test2 with datacenter datacentre: could not achieve replication factor 1 (found 0 replicas only), check your keyspace replication settings.
+---+-------+-------+------+
| id|   dept|   name|salary|
+---+-------+-------+------+
|102|  spark| sakthi| 3500 |
|101|bigdata|   siva| 3000 |
|103|   Java|prakash| 3600 |
+---+-------+-------+------+


//With Using Case Class
scala> case class Emp(id:Int, Dept:String, Name:String, Salary:Int)
defined class Emp

scala> val r1 = sc.cassandraTable[Emp]("test1","employee")
r1: com.datastax.spark.connector.rdd.CassandraTableScanRDD[Emp] = CassandraTableScanRDD[8] at RDD at CassandraRDD.scala:19

// show the records as tuple
scala> r1.collect.foreach(println)
Emp(102,spark,sakthi,3500)                                                   
Emp(101,bigdata,siva,3000)
Emp(103,Java,prakash,3600)

//Making Dataframe from RDD
scala> val df = r1.toDF();
df: org.apache.spark.sql.DataFrame = [id: int, Dept: string ... 2 more fields]

scala> df.show

+---+-------+-------+------+                                                 
| id|   Dept|   Name|Salary|
+---+-------+-------+------+
|102|  spark| sakthi|  3500|
|101|bigdata|   siva|  3000|
|103|   Java|prakash|  3600|
+---+-------+-------+------+

// Before applying schema

scala> val r = sc.cassandraTable("test1","student")

r: com.datastax.spark.connector.rdd.CassandraTableScanRDD[com.datastax.spark.connector.CassandraRow] = CassandraTableScanRDD[13] at RDD at CassandraRDD.scala:19

scala> r.collect.foreach(println)

2019-02-25 12:38:03 WARN  ReplicationStrategy$NetworkTopologyStrategy:200 - Error while computing token map for keyspace test2 with datacenter datacentre: could not achieve replication factor 1 (found 0 replicas only), check your keyspace replication settings.
CassandraRow{id: 202, age: null, course: null, name: null}
CassandraRow{id: 203, age: 33, course: Kafka, name: stella}
CassandraRow{id: 200, age: 25, course: Spark, name: Sanmugh}
CassandraRow{id: 201, age: 22, course: Cassandra, name: David}
CassandraRow{id: 204, age: 22, course: null, name: John}

// Applying Schema here
scala> val r = sc.cassandraTable[(Int,Int,String,String)]("test1","student")
r: com.datastax.spark.connector.rdd.CassandraTableScanRDD[(Int, Int, String, String)] = CassandraTableScanRDD[14] at RDD at CassandraRDD.scala:19

// We have null values in our data, so we will get exception here
scala> r.collect.foreach(println)
com.datastax.spark.connector.types.TypeConversionException: Failed to convert column age of test1.student to Int: null


//If Non null value is there in table it will work. If Null value is there, it wont work
//Int cannot bring data from null
//We have a record which has except id all the columns are null

//Here we applied Option[DataType] for necessary column to avoid exception

scala> val r = sc.cassandraTable[(Option[Int],Option[Int],Option[String],Option[String])]("test1","student")
r: com.datastax.spark.connector.rdd.CassandraTableScanRDD[(Option[Int], Option[Int], Option[String], Option[String])] = CassandraTableScanRDD[15] at RDD at CassandraRDD.scala:19

// null value will be displayed as None

scala> r.collect.foreach(println)
2019-02-25 12:42:40 WARN  ReplicationStrategy$NetworkTopologyStrategy:200 - Error while computing token map for keyspace test2 with datacenter datacentre: could not achieve replication factor 1 (found 0 replicas only), check your keyspace replication settings.
(Some(202),None,None,None)                                                   
(Some(203),Some(33),Some(Kafka),Some(stella))
(Some(200),Some(25),Some(Spark),Some(Sanmugh))
(Some(201),Some(22),Some(Cassandra),Some(David))
(Some(204),Some(22),None,Some(John))

// converting RDD to Dataframe with column headers

scala> val df = r.toDF("id","age","course","name")
df: org.apache.spark.sql.DataFrame = [id: int, age: int ... 2 more fields]

//Null values are displayed here

scala> df.show
+---+----+----------+-------+
| id| age|course    |   name|
+---+----+----------+-------+
|202|null|      null|   null|
|203|  33|     Kafka| stella|
|200|  25|     Spark|Sanmugh|
|201|  22| Cassandra|  David|
|204|  22|      null|   John|
+---+----+----------+-------+

// Replacing nulls in course column with Bigdata

scala> val df1 = df.na.fill("Bigdata",Array("course"))
df1: org.apache.spark.sql.DataFrame = [id: int, age: int ... 2 more fields]


scala> df1.show
+---+----+---------+-------+
| id| age|   course|   name|
+---+----+---------+-------+
|202|null|  Bigdata|   null|
|203|  33|    Kafka| stella|
|200|  25|    Spark|Sanmugh|
|201|  22|Cassandra|  David|
|204|  22|  Bigdata|   John|
+---+----+---------+-------+


//We didnt mention column names, so what ever string fields has null will be replaced with 'Bigdata'

scala> df1.na.fill("Bigdata").show
+---+----+---------+-------+
| id| age|   course|   name|
+---+----+---------+-------+
|202|null|  Bigdata|Bigdata|  // Here Name is Bigdata -- wrong approach
|203|  33|    Kafka| stella|
|200|  25|    Spark|Sanmugh|
|201|  22|Cassandra|  David|
|204|  22|  Bigdata|   John|
+---+----+---------+-------+

// We didnt specify column names, so whatever numeric fields which has null values will be replaced with 100

scala> df1.na.fill(100).show
2019-02-25 12:52:11 WARN  ReplicationStrategy$NetworkTopologyStrategy:200 - Error while computing token map for keyspace test2 with datacenter datacentre: could not achieve replication factor 1 (found 0 replicas only), check your keyspace replication settings.
+---+---+---------+-------+                                                   
| id|age|   course|   name|
+---+---+---------+-------+
|202|100|  Bigdata|   null| // Here age is 100 - wrong data
|203| 33|    Kafka| stella|
|200| 25|    Spark|Sanmugh|
|201| 22|Cassandra|  David|
|204| 22|  Bigdata|   John|
+---+---+---------+-------+


// it will drop which ever rows has null in whatever columns
scala> df.na.drop().show

+---+---+---------+-------+
| id|age|   course|   name|
+---+---+---------+-------+
|203| 33|    Kafka| stella|
|200| 25|    Spark|Sanmugh|
|201| 22|Cassandra|  David|
+---+---+---------+-------+


// Here we are going to Export inmemory RDD content into Cassandra


//Here we create some in memory collection objects

Start spark in one more terminal:
----------------------------------
$  spark-shell --packages com.datastax.spark:spark-cassandra-connector_2.11:2.4.0 --conf spark.cassandra.connection.host=localhost

scala> import com.datastax.spark.connector._
import com.datastax.spark.connector._

scala> case class Emp(id:Int, Dept:String,Name:String, Salary:Int)
defined class Emp

scala> val ob1 = new Emp(121,"accounts","Hari",5000);
ob1: Emp = Emp(121,accounts,Hari,5000)

scala> val ob2 = new Emp(122,"HR","Rani",6000);
ob2: Emp = Emp(122,HR,Rani,6000)

scala> val ob3 = new Emp(123,"Marketing","Suresh",6500);
ob3: Emp = Emp(123,Marketing,Suresh,6500)

scala> val r1 = sc.makeRDD(Seq(ob1,ob2,ob3));
r1: org.apache.spark.rdd.RDD[Emp] = ParallelCollectionRDD[40] at makeRDD at <console>:33

scala> r1.collect.foreach(println);
Emp(121,accounts,Hari,5000)
Emp(122,HR,Rani,6000)
Emp(123,Marketing,Suresh,6500)

// Export RDD content into Cassandra
scala> r1.saveToCassandra("test1","employee");

// verify the newly inserted rows in Cassandra
cqlsh> select * from test1.employee;
-----+-----------+---------+--------
 id  | dept      | name    | salary
-----+-----------+---------+--------
 123 | Marketing |  Suresh |   6500 // newly inserted from spark to Cassandra
 122 |        HR |    Rani |   6000 // newly inserted from spark to Cassandra
 121 |  accounts |    Hari |   5000 // newly inserted from spark to Cassandra
 102 |     spark |  sakthi |   3500
 101 |   bigdata |    siva |   3000
 103 |      Java | prakash |   3600

No comments:

Post a Comment

Python Challenges Program

Challenges program: program 1: #Input :ABAABBCA #Output: A4B3C1 str1="ABAABBCA" str2="" d={} for x in str1: d[x]=d...