Cassandra and Spark Integration
//start cassandra server
$ sudo service cassandra start
$ sudo update-rc.d cassandra defaults
$ cqlsh localhost
Connected to Test Cluster at localhost:9042.
[cqlsh 5.0.1 | Cassandra 3.11.4 | CQL spec 3.4.4 | Native protocol v4]
cqlsh> describe cluster;
Cluster: Test Cluster
Partitioner: Murmur3Partitioner
cqlsh> describe keyspaces;
system_traces system_schema system_auth system system_distributed
cqlsh> CREATE KEYSPACE people with replication = {'class':'SimpleStrategy','replication_factor':1};
cqlsh:people> describe people;
CREATE TABLE users (
... id varchar,
... first_name varchar,
... last_name varchar,
... city varchar,
... emails varchar,
... PRIMARY KEY (id));
cqlsh:people> insert into users(id,first_name,last_name,city,emails) values ('101','Sankara','narayanan','PLTR','sa@sa.com');
cqlsh:people> insert into users(id,first_name,last_name,city,emails) values ('102','Harish','Kalyan','CHN','ha@ka.in');
cqlsh:people> select * from users;
id | city | emails | first_name | last_name
-----+------+-----------+------------+-----------
102 | CHN | ha@ka.in | Harish | Kalyan
101 | PLTR | sa@sa.com | Sankara | narayanan
cqlsh:people> describe users;
CREATE TABLE people.users (
id text PRIMARY KEY,
city text,
emails text,
first_name text,
last_name text
) WITH bloom_filter_fp_chance = 0.01
AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
AND comment = ''
AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
AND crc_check_chance = 1.0
AND dclocal_read_repair_chance = 0.1
AND default_time_to_live = 0
AND gc_grace_seconds = 864000
AND max_index_interval = 2048
AND memtable_flush_period_in_ms = 0
AND min_index_interval = 128
AND read_repair_chance = 0.0
AND speculative_retry = '99PERCENTILE';
//Get Spark Cassandra Connector from Maven repository:
<!-- https://mvnrepository.com/artifact/com.datastax.spark/spark-cassandra-connector -->
<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector_2.11</artifactId>
<version>2.4.0</version>
</dependency>
Make this:
com.datastax.spark:spark-cassandra-connector_2.11:2.4.0
----------------------------------
It is download connector jar from : https://repo1.maven.org/maven2/com/datastax/spark/spark-cassandra-connector_2.11/2.4.0/spark-cassandra-connector_2.11-2.4.0.jar
// https://mvnrepository.com/artifact/com.datastax.spark/spark-cassandra-connector
libraryDependencies += "com.datastax.spark" %% "spark-cassandra-connector" % "2.4.0"
----------------------------------
// Run Spark with the above package:
spark-shell --packages com.datastax.spark:spark-cassandra-connector_2.11:2.4.0
scala> import com.datastax.spark.connector._
scala> import org.apache.spark.SparkConf
scala> import org.apache.spark.SparkContext
scala> import org.apache.spark.SparkContext._
scala> val conf = new SparkConf().setMaster("local").setAppName("sample cassandra app").set("spark.cassandra.connection.host","localhost").set("spark.driver.allowMultipleContexts","true")
scala> val sc = new SparkContext(conf)
scala> val personRDD = sc.cassandraTable("people","users") // KeySpace, Table name
scala> personRDD.take(2).foreach(println)
CassandraRow{id: 101a, city: PLTR, emails: sa@sa.com, first_name: Sankara, last_name: narayanan}
CassandraRow{id: 102, city: CHN, emails: ha@ka.in, first_name: Harish, last_name: Kalyan}
scala> personRDD.count
res1: Long = 2
scala> val df = spark.read.format("org.apache.spark.sql.cassandra").options(Map("keyspace" -> "people", "table" -> "users")).load
scala> df.show
+---+----+---------+----------+---------+
| id|city| emails|first_name|last_name|
+---+----+---------+----------+---------+
|101|PLTR|sa@sa.com| Sankara|narayanan|
|102| CHN| ha@ka.in| Harish| Kalyan|
+---+----+---------+----------+---------+
$ sudo service cassandra start
$ sudo update-rc.d cassandra defaults
//start CLI for Cassandra
Connected to Test Cluster at localhost:9042.
[cqlsh 5.0.1 | Cassandra 3.11.4 | CQL spec 3.4.4 | Native protocol v4]
Use HELP for help.
Cluster: Test Cluster
Partitioner: Murmur3Partitioner
cqlsh> describe keyspaces;
system_traces system_schema system_auth system system_distributed
cqlsh> CREATE KEYSPACE people with replication = {'class':'SimpleStrategy','replication_factor':1};
cqlsh> use people;
CREATE TABLE users (
... id varchar,
... first_name varchar,
... last_name varchar,
... city varchar,
... emails varchar,
... PRIMARY KEY (id));
cqlsh:people> insert into users(id,first_name,last_name,city,emails) values ('101','Sankara','narayanan','PLTR','sa@sa.com');
cqlsh:people> insert into users(id,first_name,last_name,city,emails) values ('102','Harish','Kalyan','CHN','ha@ka.in');
cqlsh:people> select * from users;
id | city | emails | first_name | last_name
-----+------+-----------+------------+-----------
102 | CHN | ha@ka.in | Harish | Kalyan
101 | PLTR | sa@sa.com | Sankara | narayanan
cqlsh:people> describe users;
CREATE TABLE people.users (
id text PRIMARY KEY,
city text,
emails text,
first_name text,
last_name text
) WITH bloom_filter_fp_chance = 0.01
AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
AND comment = ''
AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
AND crc_check_chance = 1.0
AND dclocal_read_repair_chance = 0.1
AND default_time_to_live = 0
AND gc_grace_seconds = 864000
AND max_index_interval = 2048
AND memtable_flush_period_in_ms = 0
AND min_index_interval = 128
AND read_repair_chance = 0.0
AND speculative_retry = '99PERCENTILE';
//Get Spark Cassandra Connector from Maven repository:
<!-- https://mvnrepository.com/artifact/com.datastax.spark/spark-cassandra-connector -->
<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector_2.11</artifactId>
<version>2.4.0</version>
</dependency>
Make this:
com.datastax.spark:spark-cassandra-connector_2.11:2.4.0
----------------------------------
It is download connector jar from : https://repo1.maven.org/maven2/com/datastax/spark/spark-cassandra-connector_2.11/2.4.0/spark-cassandra-connector_2.11-2.4.0.jar
// https://mvnrepository.com/artifact/com.datastax.spark/spark-cassandra-connector
libraryDependencies += "com.datastax.spark" %% "spark-cassandra-connector" % "2.4.0"
----------------------------------
// Run Spark with the above package:
spark-shell --packages com.datastax.spark:spark-cassandra-connector_2.11:2.4.0
scala> import com.datastax.spark.connector._
scala> import org.apache.spark.SparkConf
scala> import org.apache.spark.SparkContext
scala> import org.apache.spark.SparkContext._
scala> val conf = new SparkConf().setMaster("local").setAppName("sample cassandra app").set("spark.cassandra.connection.host","localhost").set("spark.driver.allowMultipleContexts","true")
scala> val sc = new SparkContext(conf)
scala> val personRDD = sc.cassandraTable("people","users") // KeySpace, Table name
scala> personRDD.take(2).foreach(println)
CassandraRow{id: 101a, city: PLTR, emails: sa@sa.com, first_name: Sankara, last_name: narayanan}
CassandraRow{id: 102, city: CHN, emails: ha@ka.in, first_name: Harish, last_name: Kalyan}
scala> personRDD.count
res1: Long = 2
scala> val df = spark.read.format("org.apache.spark.sql.cassandra").options(Map("keyspace" -> "people", "table" -> "users")).load
scala> df.show
+---+----+---------+----------+---------+
| id|city| emails|first_name|last_name|
+---+----+---------+----------+---------+
|101|PLTR|sa@sa.com| Sankara|narayanan|
|102| CHN| ha@ka.in| Harish| Kalyan|
+---+----+---------+----------+---------+
No comments:
Post a Comment