Monday, February 25, 2019

Cassandra and Spark Integration

Cassandra and Spark Integration



//start cassandra server

$ sudo service cassandra start

$ sudo update-rc.d cassandra defaults


//start CLI for Cassandra

$ cqlsh localhost
Connected to Test Cluster at localhost:9042.
[cqlsh 5.0.1 | Cassandra 3.11.4 | CQL spec 3.4.4 | Native protocol v4]

Use HELP for help.

cqlsh>  describe cluster;

Cluster: Test Cluster
Partitioner: Murmur3Partitioner

cqlsh>  describe keyspaces;

system_traces  system_schema  system_auth  system  system_distributed


cqlsh> CREATE KEYSPACE people with replication = {'class':'SimpleStrategy','replication_factor':1};
cqlsh> use people;

cqlsh:people> describe people;

CREATE TABLE users (
          ... id varchar,
          ... first_name varchar,
          ... last_name varchar,
          ... city varchar,
          ... emails varchar,
          ... PRIMARY KEY (id));


cqlsh:people> insert into users(id,first_name,last_name,city,emails) values ('101','Sankara','narayanan','PLTR','sa@sa.com');


cqlsh:people> insert into users(id,first_name,last_name,city,emails) values ('102','Harish','Kalyan','CHN','ha@ka.in');


cqlsh:people> select * from users;

 id  | city | emails    | first_name | last_name
-----+------+-----------+------------+-----------
 102 |  CHN |  ha@ka.in |     Harish |    Kalyan
 101 | PLTR | sa@sa.com |    Sankara | narayanan

cqlsh:people> describe users;

CREATE TABLE people.users (
    id text PRIMARY KEY,
    city text,
    emails text,
    first_name text,
    last_name text
) WITH bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
    AND comment = ''
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
    AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND crc_check_chance = 1.0
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99PERCENTILE';


//Get Spark Cassandra Connector from Maven repository:

<!-- https://mvnrepository.com/artifact/com.datastax.spark/spark-cassandra-connector -->
<dependency>
    <groupId>com.datastax.spark</groupId>
    <artifactId>spark-cassandra-connector_2.11</artifactId>
    <version>2.4.0</version>
</dependency>

Make this:
com.datastax.spark:spark-cassandra-connector_2.11:2.4.0
----------------------------------
It is download connector jar from : https://repo1.maven.org/maven2/com/datastax/spark/spark-cassandra-connector_2.11/2.4.0/spark-cassandra-connector_2.11-2.4.0.jar
// https://mvnrepository.com/artifact/com.datastax.spark/spark-cassandra-connector
libraryDependencies += "com.datastax.spark" %% "spark-cassandra-connector" % "2.4.0"
----------------------------------



// Run Spark with the above package:

spark-shell --packages com.datastax.spark:spark-cassandra-connector_2.11:2.4.0



scala> import com.datastax.spark.connector._
scala> import org.apache.spark.SparkConf
scala> import org.apache.spark.SparkContext
scala> import org.apache.spark.SparkContext._

scala> val conf = new SparkConf().setMaster("local").setAppName("sample cassandra app").set("spark.cassandra.connection.host","localhost").set("spark.driver.allowMultipleContexts","true")

scala> val sc = new SparkContext(conf)
scala> val personRDD = sc.cassandraTable("people","users") // KeySpace, Table name

scala> personRDD.take(2).foreach(println)
CassandraRow{id: 101a, city: PLTR, emails: sa@sa.com, first_name: Sankara, last_name: narayanan}
CassandraRow{id: 102, city: CHN, emails: ha@ka.in, first_name: Harish, last_name: Kalyan}

scala> personRDD.count
res1: Long = 2                                                               

scala> val df = spark.read.format("org.apache.spark.sql.cassandra").options(Map("keyspace" -> "people", "table" -> "users")).load

scala> df.show
+---+----+---------+----------+---------+                                     
| id|city|   emails|first_name|last_name|
+---+----+---------+----------+---------+
|101|PLTR|sa@sa.com|   Sankara|narayanan|
|102| CHN| ha@ka.in|    Harish|   Kalyan|
+---+----+---------+----------+---------+

No comments:

Post a Comment

Python Challenges Program

Challenges program: program 1: #Input :ABAABBCA #Output: A4B3C1 str1="ABAABBCA" str2="" d={} for x in str1: d[x]=d...