Thursday, February 28, 2019

PYSPARK PYCHARM PROJECT

Points:


  LFS is not distributed

  HDFS,HIVE,CASSANDRA are distributed well

  spark driver creation:

  sparkcontext----------->RDD

  SQLContext,HiveContext==>Dataframes

  sparksession------>Dataframe,RDD

dataframes==>csv,xml.json,tables(semi structured/structured)


rdd-->unstructured(cleaning)
 

Step1 : Install pycharm community edition

step2 : File-->Setting-->python interpreter-->choose the python 2.7 version

step 3:create a SQLPackage -->then create python program -->Demo.pycharm

Step 4: Run the below code


Input File:

[{
"Year": "2013",
"First Name": "JANE",
"County": "A",
"Sex": "F",
"Count": "27"
}, {
"Year": "2013",
"First Name": "JADE",
"County": "B",
"Sex": "M",
"Count": "26"
}, {
"Year": "2013",
"First Name": "JAMES",
"County": "C",
"Sex": "M",
"Count": "21"
}]


Driver program:

from pyspark.sql import SparkSession
def createdriver():
 spark=SparkSession.builder.master('local').appName('demoapp').getOrCreate()
 return spark



Main program:

from Demo import createdriver
import logging

#logging configuration
#logging levels are INFO,DEBUG,ERROR,WARN,FATAL
logging.basicConfig(filename='c:\\Users\\Nethra\\Desktop\\spark1.log',level=logging.INFO)

if__name__='__main__':
sparkdriver=createdriver()
logging.info('Spark driver created successfully')

#reading the input parameter
logging.info('Reading input parameters')
file_format=raw_input("Enter the file format\n")
input_file=raw_input("Enter the input file name\n")
logging.info('input parameters read succssfully')

# reading the data
df=sparkdriver.read.format(file_format).option('multiLine',True).load("c:\\Users\\Nethra\\Desktop\mul.txt")
logging.info('reading done successfully')

# processing the data
df.show(10)
df.write.format('csv').mode('overwrite').save('c:\\Users\\Nethra\\Desktop\\dfout')

#Read from CSV again
df1=sparkdriver.read.format("csv").load('c:\\Users\\Nethra\\Desktop\\spark1.log')
df1.write.saveAsTable('dfTable')

# stop the application/job
logging.info('stopping the application')
sparkdriver.stop()



Example 2: SparkContext program

from pyspark import SparkContext
from pyspark.sql.types import *

sc=SparkContext(master='local',appName='demoapp')
rd1=sc.textFile("c:\\Users\\Nethra\\Desktop\\spark1.log")
rd2=rd1.map(lambda x: str(x))
print(rd1.collect())
print(type(rd2))

Example 3: Spark Session Program

from pyspark.sql.types import *
from Demo import createdriver

spark=createdriver()

rd1=spark.sparkContext.textFile("c:\\Users\\Nethra\\Desktop\\spark1.log")

print(rd1)

print(rd1.collect())


Example 3: In windows, spark hive integeration

Driver program:


 def createSparkDriver():
  spark=SparkSession.builder.master('local').appName('demoapp').
  config('hive.metastore.uris','thrift://localhost:9083');\
  config('spark.sql.warehouse.dir','hdfs://localhost:8020/user/hive/warehouse').\
  enableHiveSupport().\
  getOrCreate()
  return spark

 
  from pyspark.sql import SQLContext
  import os

  os.environ['HADOOP_HOME']=='C:\hadoop'

  sc=SparkContext(master='local',appName='demoapp',conf='spark.jars.packages='org.apache.spark:spark_hive_2.11:2.4.0"')
  sqlc=SQLContext(sc)
 
  df2=sqlc.read.format('json').option('multiLine',True).load("c:\\")

  print(df2)

  df2.show(5)





















Wednesday, February 27, 2019

PYSPARK RDBMS OPERATIONS

PYSPARK RDBMS OPERATIONS

from pyspark.sql import SparkSession

sparkdriver=SparkSession.builder.master('local').appName('demoapp').\
config('spark.jars.packages','mysql:mysql-connector-java:5.1.44').\
getOrCreate()
df_mysql=sparkdriver.read.format('jdbc').\
option('url','jdbc:mysql://localhost:3306').\
option('user','hadoop').\
option('password','hadoop').\
#option('dbtable','world.city').\
option('query','select count(*) from world.city group by ).\
load()   ===> it is running on the server side

df_mysql.show(10)

+---+----------------+-----------+-------------+----------+
| ID|            Name|CountryCode|     District|Population|
+---+----------------+-----------+-------------+----------+
|  1|           Kabul|        AFG|        Kabol|   1780000|
|  2|        Qandahar|        AFG|     Qandahar|    237500|
|  3|           Herat|        AFG|        Herat|    186800|
|  4|  Mazar-e-Sharif|        AFG|        Balkh|    127800|
|  5|       Amsterdam|        NLD|Noord-Holland|    731200|
|  6|       Rotterdam|        NLD| Zuid-Holland|    593321|
|  7|            Haag|        NLD| Zuid-Holland|    440900|
|  8|         Utrecht|        NLD|      Utrecht|    234323|
|  9|       Eindhoven|        NLD|Noord-Brabant|    201843|
| 10|         Tilburg|        NLD|Noord-Brabant|    193238|
| 11|       Groningen|        NLD|    Groningen|    172701|
| 12|           Breda|        NLD|Noord-Brabant|    160398|
| 13|       Apeldoorn|        NLD|   Gelderland|    153491|
| 14|        Nijmegen|        NLD|   Gelderland|    152463|
| 15|        Enschede|        NLD|   Overijssel|    149544|
| 16|         Haarlem|        NLD|Noord-Holland|    148772|
| 17|          Almere|        NLD|    Flevoland|    142465|
| 18|          Arnhem|        NLD|   Gelderland|    138020|
| 19|        Zaanstad|        NLD|Noord-Holland|    135621|
| 20|´s-Hertogenbosch|        NLD|Noord-Brabant|    129170|
+---+----------------+-----------+-------------+----------+

only showing top 20 rows

df_mysql=sparkdriver.read.format('jdbc').\
option('url','jdbc:mysql://localhost:3306',password='hadoop',user='hadoop',dbtable='world.city').\
load()

df_mysql.show(10)

Function Explanation:

from pyspark.sql.functions import *

sparkdriver=SparkSession.builder.master('local').appName('demoapp').\
config('spark.jars.packages','mysql:mysql-connector-java:5.1.44').\
getOrCreate()

f1=sparkdriver.sql("show functions")

print(type(f1))

print(f1.count())

#f1.show(296)

sparkdriver.sql('describe function aggregate').collect()
<class 'pyspark.sql.dataframe.DataFrame'>
296

Out[7]:
[Row(function_desc=u'Function: aggregate'),
 Row(function_desc=u'Class: org.apache.spark.sql.catalyst.expressions.ArrayAggregate'),
 Row(function_desc=u'Usage: \n      aggregate(expr, start, merge, finish) - Applies a binary operator to an initial state and all\n 
 elements in the array, and reduces this to a single state. The final state is converted\n      into the final result by applying a finish function.\n    ')]

reading from source-->daily basis--->transformation-->loading warehouse
30/11-->data1
31/11-->data2
1/12-->data3

df_mysql.show(5)

df1=df_mysql.withColumn("day",current_date())

df1=df_mysql.withColumn("day",current_timestamp())

df1=df_mysql.withColumn("day",date_add(current_timestamp(),1))

df1=df_mysql.withColumn("day",date_add(current_date(),1))

df1.select('end_time','day',yar('day'),month('day'),hour('day')).show(5)

df.write.format('json').partitionBy('day').mode('append').save("  ")

df1.show()

df1.write.partitionBy('day').mode('append').saveAsTable("calldatatable')

print('success')

from pyspark.sql.types import *

def extract(x):
  x1=x.split('T')[0].split('-')
  year=(int)(x1[0])
  month=(int)(x1[1])
  day=(int)(x1[2])
  return [year,month,day]

print(extract('2014-03-15T22:34:23'))

[2014, 3, 15]

 sparkdriver.udf.register('myfun',extract,StringType())

myfunc=udf(extract, df_mysql.select('endtime',myfunc)

PYSPARK RESTAPI operation


RESTAPI --> reading data through URL

url-->fetchdata-->file-->dataframe

import requests
import json
from pyspark.sql import SparkSession

sparkdriver=SparkSession.builder.master('local').appName('demoapp').\
config('spark.jars.packages','mysql:mysql-connector-java:5.1.44').\
getOrCreate()

jsonapidata=requests.request('GET','https://api.github.com/users/hadley/orgs')
jsonapidata

jsondata=jsonapidata.json()

print(type(len(jsonapidata.json())))

print(len(jsonapidata.json()))

file= open("C:\\Users\\Nethra\\Desktop\\restapi.json",'a')

for record in jsondata:
    file.write("%s\n" %record)

df_json=sparkdriver.read.format('json').option("multiLine",True).load("C:\\Users\\Nethra\\Desktop\\restapi.json")

df_update=df_rdd.map(lambda x:(str)x).toDF()

 df_json.show(2)

print("success")

PYSPARK multiline JSON operation


Mutliline JSON:


from pyspark.sql import SparkSession

sparkdriver=SparkSession.builder.master('local').appName('demoapp4').\
config('spark.jars.packages','mysql:mysql-connector-java:5.1.44').\
getOrCreate()

df_json=sparkdriver.read.format("json").option('multiLine',True).load("file:///home/hadoop/mul.json")

df_json.show(5)

df_json.printSchema()  
     
+-----+------+----------+---+----+
|Count|County|First Name|Sex|Year|
+-----+------+----------+---+----+
|   27|     A|      JANE|  F|2013|
|   26|     B|      JADE|  M|2013|
|   21|     C|     JAMES|  M|2013|
+-----+------+----------+---+----+

oot
 |-- Count: string (nullable = true)
 |-- County: string (nullable = true)
 |-- First Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Year: string (nullable = true)

Tuesday, February 26, 2019

PYSPARK HDFS Operations

Input file:

id,starttime,endtime,status
101,2014-01-01,2013-01-01,DROPPED
102,2014-01-01,2013-01-01,DROPPED
103,2014-01-01,2013-01-01,DROPPED
104,2014-01-01,2013-01-01,FAILED
105,2014-01-01,2013-01-01,FAILED
106,2014-01-01,2013-01-01,FAILED
107,2014-01-01,2013-01-01,SUCCESS
108,2014-01-01,2013-01-01,SUCCESS
109,2014-01-01,2013-01-01,SUCCESS

Pyspark Dataframe Operations:

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
sparkdriver=SparkSession.builder.master('local').appName('demoapp2').getOrCreate()
sparkdriver

#Reading through RDD
r1=sparkdriver.sparkContext.textFile("hdfs://127.0.0.1/user/sample/sample.csv")

r1.getNumPartitions

hostname -local  ==>Ubundo

ping 127.0.0.1

In windows coding:
df_hdfs=sparkdriver.read.format('csv').option('delimter','\t').schema(sch).\
load('hdfs://172.16.38.131:8020/user/sample/sample.csv')

df_hdfs.show(5)

+---+----------+----------+-------+
|_c0|       _c1|       _c2|    _c3|
+---+----------+----------+-------+
|101|2014-01-01|2013-01-01|DROPPED|
|102|2014-01-01|2013-01-01|DROPPED|
|103|2014-01-01|2013-01-01|DROPPED|
|104|2014-01-01|2013-01-01| FAILED|
|105|2014-01-01|2013-01-01| FAILED|
|106|2014-01-01|2013-01-01| FAILED|
|107|2014-01-01|2013-01-01|SUCCESS|
|108|2014-01-01|2013-01-01|SUCCESS|
|109|2014-01-01|2013-01-01|SUCCESS|

---+----------+----------+-------+
| id| starttime|   endtime| status|
+---+----------+----------+-------+
|101|2014-01-01|2013-01-01|DROPPED|
|102|2014-01-01|2013-01-01|DROPPED|
|103|2014-01-01|2013-01-01|DROPPED|
|104|2014-01-01|2013-01-01| FAILED|
|105|2014-01-01|2013-01-01| FAILED|
|106|2014-01-01|2013-01-01| FAILED|
|107|2014-01-01|2013-01-01|SUCCESS|
|108|2014-01-01|2013-01-01|SUCCESS|
|109|2014-01-01|2013-01-01|SUCCESS|
+---+----------+----------+-------+

hdfs dfs -ls /dfdata1
hdfs dfs -cat filename

file-->10 blocks 
spark(fil)-->processed as 10 partitions

print(df_hdfs.rdd.getNumPartitions)

print(sparkdriver.sparkContext.defaultParallelism)

No of blocks=No of paritions

sch=StructType(
                [StructField('id',StringType()),
    StructField('stattime',StringType()),
    structField('status',StringType()),
    structField('status',StringType())
    ])

---+----------+----------+-------+
| id| starttime|   endtime| status|
+---+----------+----------+-------+
|101|2014-01-01|2013-01-01|DROPPED|
|102|2014-01-01|2013-01-01|DROPPED|
|103|2014-01-01|2013-01-01|DROPPED|
|104|2014-01-01|2013-01-01| FAILED|
|105|2014-01-01|2013-01-01| FAILED|
|106|2014-01-01|2013-01-01| FAILED|
|107|2014-01-01|2013-01-01|SUCCESS|
|108|2014-01-01|2013-01-01|SUCCESS|
|109|2014-01-01|2013-01-01|SUCCESS|
+---+----------+----------+-------+
    
spark(scala->jvm objects)----py4j----phython object  


df_hdfs.where('status="SUCCESS"').show(5)

df_hdfs.withColumn('fromno',col('fromno').cast('long')).select('fromno',col('fromno')/10).show

df_hdfs.withColumn('fromno',col('fromno').cast('long')).withColumn('fromnonew',col('fromno')/10).show

df_hdfs.select('fromno',col('fromno')/10).show

df_hdfs.select('status').distinct().show()

df_hdfs.withColumn('severity',when(col('status')=="DROPPED",'critical').
when(col('status')=="Failed",'Medium').otherwise('ok')).show(10)

df_hdfs.withColumn('severity',when(col('status')=="DROPPED",2).
when(col('status')=="Failed",1).otherwise(0)).show(10)

if it existing columns
df2_hdfs=df1_hdfs.withColumn('severity',when(col('status')=="DROPPED",2).\
when(col('status')=="FAILED",1).otherwise(0))

df2_hdfs.show(5)

---+----------+----------+-------+------+--------+
| id| starttime|   endtime| status|fromno|severity|
+---+----------+----------+-------+------+--------+
|101|2014-01-01|2013-01-01|DROPPED|   101|       2|
|102|2014-01-01|2013-01-01|DROPPED|   102|       2|
|103|2014-01-01|2013-01-01|DROPPED|   103|       2|
|104|2014-01-01|2013-01-01| FAILED|   104|       1|
|105|2014-01-01|2013-01-01| FAILED|   105|       1|

df2_hdfs=df1_hdfs.withColumn('status',when(col('status')=="DROPPED",2).\
when(col('status')=="FAILED",1).otherwise(0))

df2_hdfs.show(5)

---+----------+----------+------+------+
| id| starttime|   endtime|status|fromno|
+---+----------+----------+------+------+
|101|2014-01-01|2013-01-01|     2|   101|
|102|2014-01-01|2013-01-01|     2|   102|
|103|2014-01-01|2013-01-01|     2|   103|
|104|2014-01-01|2013-01-01|     1|   104|
|105|2014-01-01|2013-01-01|     1|   105|

df_hdfs.withColumn('status',when(col('status')=="DROPPED",2).
when(col('status')=="Failed",1).otherwise(0)).show(10)

if th same value for all the rows of the columns

df_hdfs2=df_hdfs.withColumn('cnew',lit(200))

df_hdfs3.show(5)
+---+----------+----------+-------+------+--------+----+
| id| starttime|   endtime| status|fromno|severity|cnew|
+---+----------+----------+-------+------+--------+----+
|101|2014-01-01|2013-01-01|DROPPED|   101|       2| 200|
|102|2014-01-01|2013-01-01|DROPPED|   102|       2| 200|
|103|2014-01-01|2013-01-01|DROPPED|   103|       2| 200|
|104|2014-01-01|2013-01-01| FAILED|   104|       1| 200|
|105|2014-01-01|2013-01-01| FAILED|   105|       1| 200|
+---+----------+----------+-------+------+--------+----+

//Rename the existing columns

dfs_hdfs4=df_hdfs.withColumnRenamed('fromno','diallingno')

---+----------+----------+-------+------+--------+
| id| starttime|   endtime| status|fromno|SEVERITY|
+---+----------+----------+-------+------+--------+
|101|2014-01-01|2013-01-01|DROPPED|   101|       2|
|102|2014-01-01|2013-01-01|DROPPED|   102|       2|
|103|2014-01-01|2013-01-01|DROPPED|   103|       2|
|104|2014-01-01|2013-01-01| FAILED|   104|       1|
|105|2014-01-01|2013-01-01| FAILED|   105|       1|
+---+----------+----------+-------+------+--------+

//drop the columns

df_hdfs.drop('id','status').show(5)  ==> spark 2 delete multiple column but spark 1 can delete 1 column at a time
+----------+-------+------+--------+
|   endtime| status|fromno|severity|
+----------+-------+------+--------+
|2013-01-01|DROPPED|   101|       2|
|2013-01-01|DROPPED|   102|       2|
|2013-01-01|DROPPED|   103|       2|
|2013-01-01| FAILED|   104|       1|
|2013-01-01| FAILED|   105|       1|

df-->dsl domain specific language
tables-->pure sql language


we can create tables(temporary,permanant) from dataframes if we want to use sql

df_hdfs.registerTempTable('temptable1')

sparkdriver.sql('select count(*) from temptable1 group by status').show()

df_hdfs2.select('severity').printSchema

sparkdriver.sql('create database db1').show()

sparkdriver.sql('show databases').show()

sparkdriver.sql('show tables in default').show()

df_hdfs.write.saveAsTable('db1.permtable')

sparkdriver.sql('show tables in db1').show()

database and tables--->permanent

storage system-->LFS(spark-warehouse) and Metastore will be there from 2.0
metastore -->embedded derby


creation of permanant table is not supported in spark 1.x without hive integeration,but in spark 2.x version

df_hdfs.creatOrReplaceTempView('temptable2')  -->latest one

Pyspark JDBC and XML integeration

Pyspark JDBC and XML integeration :

from pyspark.sql import SparkSession

#sparkdriver=SparkSession.builder.master('local').config('spark.jars.packages','mysql:mysql-connector-java:5.1.44').config('spark.jars','c://..............").\
#config('spark.jars.repositories','http://repository.cloudera.com/artifactory/cloudera-repos').\
#appName('demoapp').getOrCreate()

#Reading from MySQL

from pyspark.sql import SparkSession

sparkdriver=SparkSession.builder.master('local').config('spark.jars.packages','mysql:mysql-connector-java:5.1.44').\
appName('demoapp').getOrCreate()
df_mysql=sparkdriver.read.format("jdbc").\
option('url','jdbc:mysql://localhost:3306').\
option('driver','com.mysql.jdbc.Driver').\
option('user','hadoop').\
option('password','hadoop').\
option('dbTable','world.city').load()


df_mysql.show(10)

| ID|            Name|CountryCode|     District|Population|
+---+----------------+-----------+-------------+----------+
|  1|           Kabul|        AFG|        Kabol|   1780000|
|  2|        Qandahar|        AFG|     Qandahar|    237500|
|  3|           Herat|        AFG|        Herat|    186800|
|  4|  Mazar-e-Sharif|        AFG|        Balkh|    127800|
|  5|       Amsterdam|        NLD|Noord-Holland|    731200|
|  6|       Rotterdam|        NLD| Zuid-Holland|    593321|
|  7|            Haag|        NLD| Zuid-Holland|    440900|
|  8|         Utrecht|        NLD|      Utrecht|    234323|
|  9|       Eindhoven|        NLD|Noord-Brabant|    201843|
| 10|         Tilburg|        NLD|Noord-Brabant|    193238|
| 11|       Groningen|        NLD|    Groningen|    172701|
| 12|           Breda|        NLD|Noord-Brabant|    160398|
| 13|       Apeldoorn|        NLD|   Gelderland|    153491|
| 14|        Nijmegen|        NLD|   Gelderland|    152463|
| 15|        Enschede|        NLD|   Overijssel|    149544|
| 16|         Haarlem|        NLD|Noord-Holland|    148772|
| 17|          Almere|        NLD|    Flevoland|    142465|
| 18|          Arnhem|        NLD|   Gelderland|    138020|
| 19|        Zaanstad|        NLD|Noord-Holland|    135621|
| 20|´s-Hertogenbosch|        NLD|Noord-Brabant|    129170|

+---+----------------+-----------+-------------+----------+

df_mysql.printSchema()
root
 root
 |-- ID: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- CountryCode: string (nullable = true)
 |-- District: string (nullable = true)
 |-- Population: integer (nullable = true)

 #Filter the values
df_mysql.filter('CountryCode="NLD"' and 'District="Gelderland"').show(5)

#Read the json file
input file:
{"name":"Michael", "salary":3000}
{"name":"Andy", "salary":4500}
{"name":"Justin", "salary":3500}
{"name":"Berta", "salary":4000}


df_local=sparkdriver.read.format('json').load("file:///home/hadoop/employee.json")

df_local.show(5)
+-------+------+
|   name|salary|
+-------+------+
|Michael|  3000|
|   Andy|  4500|
| Justin|  3500|
|  Berta|  4000|
df_local2=df_local.select('name','salary')

#Writing into SQL

df_mysqlw=df_local2.write.format("jdbc").\
option('url','jdbc:mysql://localhost:3306').\
option('driver','com.mysql.jdbc.Driver').\
option('user','hadoop').\
option('password','hadoop').\
option('dbTable','employee').\
mode('append').\
save()

#check in MYSQL
mysql> select * from employee;
+---------+--------+
| name    | salary |
+---------+--------+
| Michael |   3000 |
| Andy    |   4500 |
| Justin  |   3500 |
| Berta   |   4000 |
+---------+--------+
4 rows in set (0.00 sec)

#Reading the XML files

<Records>
<Rec>
 <Name>John</Name>
 <Age>10</Age>
 <Gender>M</Gender>
</Rec>
<Rec>
 <Name>Jenny</Name>
 <Age>12</Age>
 <Gender>F</Gender>
</Rec>
<Rec>
 <Name>Janardhan</Name>
 <Age>14</Age>
 <Gender>M</Gender>
</Rec>
</Records>


pyspark --packages  com.databricks:spark-xml_2.11:0.4.1

from pyspark.sql import SparkSession
sparkdriver=SparkSession.builder.master('local').config('spark.jars.packages','com.databricks:spark-xml_2.11:0.4.1').\
config('spark.jars','file:///home/hadoop/Downloads/spark-xml-0.1.1-s_2.11.jar').\
appName('demoapp1').getOrCreate()
df_xml=sparkdriver.read.format('xml').option('rowTag','Rec').load('file:///home/hadoop/records.xml')

df_xml.show()
+---+------+---------+
|Age|Gender|     Name|
+---+------+---------+
| 10|     M|     John|
| 12|     F|    Jenny|
| 14|     M|Janardhan|
+---+------+---------+

df_xml.select('Name','age').show(5)
+---------+---+
|     Name|age|
+---------+---+
|     John| 10|
|    Jenny| 12|
|Janardhan| 14|
+---------+---+

#writing into ORC file

df_xml.write.format('orc').partitionBy('Gender').save("file:///home/hadoop/xml_orc/")

Output:
------------

total 8
drwxr-xr-x 2 hadoop hadoop 4096 Feb 27 02:03 'Gender=F'
drwxr-xr-x 2 hadoop hadoop 4096 Feb 27 02:03 'Gender=M'
-rw-r--r-- 1 hadoop hadoop    0 Feb 27 02:03  _SUCCESS
hadoop@hadoop:~/xml_orc$ cd Gender=F
hadoop@hadoop:~/xml_orc/Gender=F$ ls
part-00000-79c6d983-a31d-4fc4-9117-95250f3d3cbb.c000.snappy.orc

Kafka Installation Steps

Step 1:
download kafka

http://www-eu.apache.org/dist/kafka/1.0.0/kafka_2.11-1.0.0.tgz

Step 2:

copy to cloudera VM.

Open terminal

tar -xvzf kafka_2.11-1.0.0.tgz

Step 3:

cd kafka_2.11-1.0.0
cd config

Edit server.properties
vi server.properties

Edit the zookeeper.server : quickstart.cloudera

zookeeper.connect=localhost:2181


---------------------

Step 4:


Make sure zookeeper is running.

cd /usr/lib/zookeeper/bin
sh zkCli.sh

Step 5:

Start the server:

kafka bin folder =>

cd /home/cloudera/kafka_2.11-1.0.0/bin

sh kafka-server-start.sh /home/cloudera/kafka_2.11-1.0.0/config/server.properties &

sh kafka-topics.sh --create --topic Payment_IND1 --zookeeper quickstart.cloudera:2181 --replication-factor 1 --partitions 3

sh kafka-topics.sh --zookeeper quickstart.cloudera:2181 --list

sh kafka-console-producer.sh --broker-list quickstart.cloudera:9092 --topic Payment_IND1

sh kafka-console-consumer.sh --zookeeper quickstart.cloudera:2181 --topic Payment_IND1 --from-beginning

netstat -anlp | grep 9092

spark-submit --class SparkKafkaIntegration Spark_Streaming_Kafka-0.0.1-jar-with-dependencies.jar quickstart.cloudera:2181 Test tamilboomimar 1



cat /tmp/kafka-logs/Payment_IND1-0/00000000000000000000.log
cat /tmp/kafka-logs/Payment_IND1-1/00000000000000000000.log
cat /tmp/kafka-logs/Payment_IND1-2/00000000000000000000.log

Installation of the jupyter notebook on Ubundo

Installation of the jupyter notebook on Ubundo:

Python 2:

step 1:sudo apt install python-pip

step 2:pip --version

step 3:sudo pip install --upgrade pip

step 4:sudo pip install jupyter

step 5:jupytor notebook   --> once hit it , the localhost:8088 url is opened where you can run the program

Python 3:

Step1 :sudo apt install python3-pip
Step 2: pip3 install jupyter
Step 3: pip3 install py4j
step 4:jupytor notebook   --> once hit it , the localhost:8088 url is opened where you can run the program

For Windows,

https://www.anaconda.com/distribution/


Spark and Jupyter connectivity:

gedit ~/.bashrc    =>> add below path

 export PYSPARK_DRIVER_PYTHON=jupyter
 export PYSPARK_DRIVER_PYTHON_OPTS='notebook'
 export PYSPARK_PYTHON=/user/local/spark/python2.7
 export PYTHONPATH=$PYSPARK_PYTHON:$PYTHONPATH

then source ~/.bashrc



Windows environment variable:

JAVA_HOME C:\Program Files\Java\jdk1.8.0_201
PYSPARK_DRIVER_PYTHON Jupyter
PYSPARK_DRIVER_PYTHON_OPTS  notebook
PYSPARK_PYTHON python2.7
SPARK_HOME F:\spark-2.4.0-bin-hadoop2.7

PATH-->;C:\Program Files\Java\jre1.8.0_201\bin;C:\Program Files\Java\jdk1.8.0_201\bin;F:\spark-2.4.0-bin-hadoop2.7\bin;C:\Windows\System32;C:\hadoop\bin;C:\Users\Nethra\Anaconda3;C:\Users\Nethra\Anaconda3\Scripts;C:\Users\Nethra\Anaconda3\DLLs;





Monday, February 25, 2019

Integrating Cassandra with Spark - Import / Export data between Spark and Cassandra

Integrating Cassandra with Spark - Import / Export data between Spark and Cassandra
Cassandra's default port number : 9042

// Start Cassandra server

$ sudo service Cassandra start

// Verify Cassandra is up

$ netstat -ln | grep 9042
tcp        0      0 127.0.0.1:9042          0.0.0.0:* 

// to start Cassandra Query Language

hadoop@hadoop:~$ cqlsh localhost

Connected to Test Cluster at localhost:9042.
[cqlsh 5.0.1 | Cassandra 3.11.4 | CQL spec 3.4.4 | Native protocol v4]
Use HELP for help.


Cassandra - Columnar Storage NoSQL
    - hbase is also a columnar
Cassandra and Hbase are same family members

Hadoop :
Master, Slave Architecture
    Name Node, Data Node

HBase:
purely based on Hadoop
  Master : HMaster
  Slave : HRegion server
 
Cassandra : Peer To Peer Architecture
Nodes are logically connected as Circle

Every can interact with every other nodes
There is no Master, Slave things
(cassandra server daemon runs on each nodes)

Hive, MySQL - data stored in the form of Tables
Hive, MySQL, RDBMS: Database (Schema)-> Tables -> Rows -> Columns

KeySpace : Schema
KeySpace -> Tables

pure sql language - cql - cassandra query language

schema is known as Keyspace in Cassandra.

// show all schemas (databases)
cqlsh> describe keyspaces;

people  system_schema  system_auth  system  system_distributed  system_traces

//SimpleStrategy means (Single DataCentre and Single Rack)
cqlsh> create schema test1 with replication = {'class':'SimpleStrategy','replication_factor':1};

// Must Include DataCentre here
//NetworkTopologyStrategy with DataCentre : Multiple DataCentre and Multiple Racks

cqlsh> create keyspace if not exists test2 with replication = {'class':'NetworkTopologyStrategy','datacentre':1};

cqlsh> describe keyspaces;

test1   system_schema  system              system_traces
people  system_auth    system_distributed

CREATE KEYSPACE test1 WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}  AND durable_writes = true;

test1  people         system_auth  system_distributed
test2  system_schema  system       system_traces 


// while creating cassandra table, primary key must be included - primary key is mandatory

 cqlsh> create table test1.employee(id int primary key, name text, salary int, dept text);

cqlsh> describe test1.employee;

CREATE TABLE test1.employee (
    id int PRIMARY KEY,
    dept text,
    name text,
    salary int
) WITH bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
    AND comment = ''
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
    AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND crc_check_chance = 1.0
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99PERCENTILE';


cqlsh> insert into test1.employee(id,name,salary,dept) values (101,'siva',3000,'bigdata');

cqlsh> insert into test1.employee(id,name,salary,dept) values (102,'sakthi',3500,'spark');

cqlsh> insert into test1.employee(id,name,salary,dept) values (103,'prakash',3600,'Java');

cqlsh> select * from test1.employee;

 id  | dept    | name    | salary
-----+---------+---------+--------
 102 |   spark |  sakthi |   3500
 101 | bigdata |    siva |   3000
 103 |    Java | prakash |   3600


cqlsh> create table test1.student(id int primary key, name text, course text, age int);
cqlsh> insert into test1.student(id,name,course,age) values (200,'Sanmugh','Spark',25);
cqlsh> insert into test1.student(id,name,age,course) values (201,'David',22,'Cassandra');
cqlsh> insert into test1.student(name,id,age,course) values ('stella',203,33,'Kafka');
cqlsh> insert into test1.student(name,id,age) values ('John',204,22);

cqlsh> describe test1;

CREATE KEYSPACE test1 WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}  AND durable_writes = true;

CREATE TABLE test1.employee (
    id int PRIMARY KEY,
    dept text,
    name text,
    salary int
) WITH bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
    AND comment = ''
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
    AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND crc_check_chance = 1.0
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99PERCENTILE';

CREATE TABLE test1.student (
    id int PRIMARY KEY,
    age int,
    course text,
    name text
) WITH bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
    AND comment = ''
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
    AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND crc_check_chance = 1.0
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99PERCENTILE';


cqlsh> select * from test1.student;

 id  | age | course    | name
-----+-----+-----------+---------
 201 |  22 | Cassandra |   David
 204 |  22 |      null |    John
 203 |  33 |     Kafka |  stella
 200 |  25 |     Spark | Sanmugh


cqlsh> insert into test1.student(id) values (202);
cqlsh> select * from test1.student;

 id  | age  | course    | name
-----+------+-----------+---------
 201 |   22 | Cassandra |   David
 204 |   22 |      null |    John
 203 |   33 |     Kafka |  stella
 200 |   25 |     Spark | Sanmugh
 202 | null |      null |    null

Start spark in one more terminal:
----------------------------------
$  spark-shell --packages com.datastax.spark:spark-cassandra-connector_2.11:2.4.0 --conf spark.cassandra.connection.host=localhost

scala> import com.datastax.spark.connector._
import com.datastax.spark.connector._


scala> val r1 = sc.cassandraTable("test1","employee")
r1: com.datastax.spark.connector.rdd.CassandraTableScanRDD[com.datastax.spark.connector.CassandraRow] = CassandraTableScanRDD[0] at RDD at CassandraRDD.scala:19

scala> r1.collect.foreach(println)
CassandraRow{id: 102, dept: spark, name: sakthi, salary: 3500}               
CassandraRow{id: 101, dept: bigdata, name: siva, salary: 3000}
CassandraRow{id: 103, dept: Java, name: prakash, salary: 3600}

scala> val r2 = sc.cassandraTable("test1","student")
r2: com.datastax.spark.connector.rdd.CassandraTableScanRDD[com.datastax.spark.connector.CassandraRow] = CassandraTableScanRDD[1] at RDD at CassandraRDD.scala:19

scala> r2.collect.foreach(println)
CassandraRow{id: 202, age: null, course: null, name: null}                   
CassandraRow{id: 203, age: 33, course: Kafka, name: stella}
CassandraRow{id: 200, age: 25, course: Spark, name: Sanmugh}
CassandraRow{id: 201, age: 22, course: Cassandra, name: David}
CassandraRow{id: 204, age: 22, course: null, name: John}

//Without using Case Class:
// Adding schema to the RDD  just mention the data types only.
scala> val r1 = sc.cassandraTable[(Int,String,String,Int)]("test1","employee")
r1: com.datastax.spark.connector.rdd.CassandraTableScanRDD[(Int, String, String, Int)] = CassandraTableScanRDD[2] at RDD at CassandraRDD.scala:19

// Now it is Tuple Here
scala> r1.collect.foreach(println)
(102,spark,sakthi,3500)                                                       
(101,bigdata,siva,3000)
(103,Java,prakash,3600)

// Converting RDD into Dataframe
scala> val df1 = r1.toDF("id","dept","name","salary");
df1: org.apache.spark.sql.DataFrame = [id: int, dept: string ... 2 more fields]

scala> df1.show
2019-02-25 12:28:56 WARN  ReplicationStrategy$NetworkTopologyStrategy:200 - Error while computing token map for keyspace test2 with datacenter datacentre: could not achieve replication factor 1 (found 0 replicas only), check your keyspace replication settings.
+---+-------+-------+------+
| id|   dept|   name|salary|
+---+-------+-------+------+
|102|  spark| sakthi| 3500 |
|101|bigdata|   siva| 3000 |
|103|   Java|prakash| 3600 |
+---+-------+-------+------+


//With Using Case Class
scala> case class Emp(id:Int, Dept:String, Name:String, Salary:Int)
defined class Emp

scala> val r1 = sc.cassandraTable[Emp]("test1","employee")
r1: com.datastax.spark.connector.rdd.CassandraTableScanRDD[Emp] = CassandraTableScanRDD[8] at RDD at CassandraRDD.scala:19

// show the records as tuple
scala> r1.collect.foreach(println)
Emp(102,spark,sakthi,3500)                                                   
Emp(101,bigdata,siva,3000)
Emp(103,Java,prakash,3600)

//Making Dataframe from RDD
scala> val df = r1.toDF();
df: org.apache.spark.sql.DataFrame = [id: int, Dept: string ... 2 more fields]

scala> df.show

+---+-------+-------+------+                                                 
| id|   Dept|   Name|Salary|
+---+-------+-------+------+
|102|  spark| sakthi|  3500|
|101|bigdata|   siva|  3000|
|103|   Java|prakash|  3600|
+---+-------+-------+------+

// Before applying schema

scala> val r = sc.cassandraTable("test1","student")

r: com.datastax.spark.connector.rdd.CassandraTableScanRDD[com.datastax.spark.connector.CassandraRow] = CassandraTableScanRDD[13] at RDD at CassandraRDD.scala:19

scala> r.collect.foreach(println)

2019-02-25 12:38:03 WARN  ReplicationStrategy$NetworkTopologyStrategy:200 - Error while computing token map for keyspace test2 with datacenter datacentre: could not achieve replication factor 1 (found 0 replicas only), check your keyspace replication settings.
CassandraRow{id: 202, age: null, course: null, name: null}
CassandraRow{id: 203, age: 33, course: Kafka, name: stella}
CassandraRow{id: 200, age: 25, course: Spark, name: Sanmugh}
CassandraRow{id: 201, age: 22, course: Cassandra, name: David}
CassandraRow{id: 204, age: 22, course: null, name: John}

// Applying Schema here
scala> val r = sc.cassandraTable[(Int,Int,String,String)]("test1","student")
r: com.datastax.spark.connector.rdd.CassandraTableScanRDD[(Int, Int, String, String)] = CassandraTableScanRDD[14] at RDD at CassandraRDD.scala:19

// We have null values in our data, so we will get exception here
scala> r.collect.foreach(println)
com.datastax.spark.connector.types.TypeConversionException: Failed to convert column age of test1.student to Int: null


//If Non null value is there in table it will work. If Null value is there, it wont work
//Int cannot bring data from null
//We have a record which has except id all the columns are null

//Here we applied Option[DataType] for necessary column to avoid exception

scala> val r = sc.cassandraTable[(Option[Int],Option[Int],Option[String],Option[String])]("test1","student")
r: com.datastax.spark.connector.rdd.CassandraTableScanRDD[(Option[Int], Option[Int], Option[String], Option[String])] = CassandraTableScanRDD[15] at RDD at CassandraRDD.scala:19

// null value will be displayed as None

scala> r.collect.foreach(println)
2019-02-25 12:42:40 WARN  ReplicationStrategy$NetworkTopologyStrategy:200 - Error while computing token map for keyspace test2 with datacenter datacentre: could not achieve replication factor 1 (found 0 replicas only), check your keyspace replication settings.
(Some(202),None,None,None)                                                   
(Some(203),Some(33),Some(Kafka),Some(stella))
(Some(200),Some(25),Some(Spark),Some(Sanmugh))
(Some(201),Some(22),Some(Cassandra),Some(David))
(Some(204),Some(22),None,Some(John))

// converting RDD to Dataframe with column headers

scala> val df = r.toDF("id","age","course","name")
df: org.apache.spark.sql.DataFrame = [id: int, age: int ... 2 more fields]

//Null values are displayed here

scala> df.show
+---+----+----------+-------+
| id| age|course    |   name|
+---+----+----------+-------+
|202|null|      null|   null|
|203|  33|     Kafka| stella|
|200|  25|     Spark|Sanmugh|
|201|  22| Cassandra|  David|
|204|  22|      null|   John|
+---+----+----------+-------+

// Replacing nulls in course column with Bigdata

scala> val df1 = df.na.fill("Bigdata",Array("course"))
df1: org.apache.spark.sql.DataFrame = [id: int, age: int ... 2 more fields]


scala> df1.show
+---+----+---------+-------+
| id| age|   course|   name|
+---+----+---------+-------+
|202|null|  Bigdata|   null|
|203|  33|    Kafka| stella|
|200|  25|    Spark|Sanmugh|
|201|  22|Cassandra|  David|
|204|  22|  Bigdata|   John|
+---+----+---------+-------+


//We didnt mention column names, so what ever string fields has null will be replaced with 'Bigdata'

scala> df1.na.fill("Bigdata").show
+---+----+---------+-------+
| id| age|   course|   name|
+---+----+---------+-------+
|202|null|  Bigdata|Bigdata|  // Here Name is Bigdata -- wrong approach
|203|  33|    Kafka| stella|
|200|  25|    Spark|Sanmugh|
|201|  22|Cassandra|  David|
|204|  22|  Bigdata|   John|
+---+----+---------+-------+

// We didnt specify column names, so whatever numeric fields which has null values will be replaced with 100

scala> df1.na.fill(100).show
2019-02-25 12:52:11 WARN  ReplicationStrategy$NetworkTopologyStrategy:200 - Error while computing token map for keyspace test2 with datacenter datacentre: could not achieve replication factor 1 (found 0 replicas only), check your keyspace replication settings.
+---+---+---------+-------+                                                   
| id|age|   course|   name|
+---+---+---------+-------+
|202|100|  Bigdata|   null| // Here age is 100 - wrong data
|203| 33|    Kafka| stella|
|200| 25|    Spark|Sanmugh|
|201| 22|Cassandra|  David|
|204| 22|  Bigdata|   John|
+---+---+---------+-------+


// it will drop which ever rows has null in whatever columns
scala> df.na.drop().show

+---+---+---------+-------+
| id|age|   course|   name|
+---+---+---------+-------+
|203| 33|    Kafka| stella|
|200| 25|    Spark|Sanmugh|
|201| 22|Cassandra|  David|
+---+---+---------+-------+


// Here we are going to Export inmemory RDD content into Cassandra


//Here we create some in memory collection objects

Start spark in one more terminal:
----------------------------------
$  spark-shell --packages com.datastax.spark:spark-cassandra-connector_2.11:2.4.0 --conf spark.cassandra.connection.host=localhost

scala> import com.datastax.spark.connector._
import com.datastax.spark.connector._

scala> case class Emp(id:Int, Dept:String,Name:String, Salary:Int)
defined class Emp

scala> val ob1 = new Emp(121,"accounts","Hari",5000);
ob1: Emp = Emp(121,accounts,Hari,5000)

scala> val ob2 = new Emp(122,"HR","Rani",6000);
ob2: Emp = Emp(122,HR,Rani,6000)

scala> val ob3 = new Emp(123,"Marketing","Suresh",6500);
ob3: Emp = Emp(123,Marketing,Suresh,6500)

scala> val r1 = sc.makeRDD(Seq(ob1,ob2,ob3));
r1: org.apache.spark.rdd.RDD[Emp] = ParallelCollectionRDD[40] at makeRDD at <console>:33

scala> r1.collect.foreach(println);
Emp(121,accounts,Hari,5000)
Emp(122,HR,Rani,6000)
Emp(123,Marketing,Suresh,6500)

// Export RDD content into Cassandra
scala> r1.saveToCassandra("test1","employee");

// verify the newly inserted rows in Cassandra
cqlsh> select * from test1.employee;
-----+-----------+---------+--------
 id  | dept      | name    | salary
-----+-----------+---------+--------
 123 | Marketing |  Suresh |   6500 // newly inserted from spark to Cassandra
 122 |        HR |    Rani |   6000 // newly inserted from spark to Cassandra
 121 |  accounts |    Hari |   5000 // newly inserted from spark to Cassandra
 102 |     spark |  sakthi |   3500
 101 |   bigdata |    siva |   3000
 103 |      Java | prakash |   3600

Export Spark RDD into Cassandra Table

Export Spark RDD into Cassandra Table



// Here we are going to Export inmemory RDD content into Cassandra


//Here we create some in memory collection objects

Start spark in one more terminal:
----------------------------------

$  spark-shell --packages com.datastax.spark:spark-cassandra-connector_2.11:2.4.0 --conf spark.cassandra.connection.host=localhost

scala> import com.datastax.spark.connector._
import com.datastax.spark.connector._

scala> case class Emp(id:Int, Dept:String,Name:String, Salary:Int)
defined class Emp

scala> val ob1 = new Emp(121,"accounts","Hari",5000);
ob1: Emp = Emp(121,accounts,Hari,5000)

scala> val ob2 = new Emp(122,"HR","Rani",6000);
ob2: Emp = Emp(122,HR,Rani,6000)

scala> val ob3 = new Emp(123,"Marketing","Suresh",6500);
ob3: Emp = Emp(123,Marketing,Suresh,6500)

scala> val r1 = sc.makeRDD(Seq(ob1,ob2,ob3));
r1: org.apache.spark.rdd.RDD[Emp] = ParallelCollectionRDD[40] at makeRDD at <console>:33

scala> r1.collect.foreach(println);
Emp(121,accounts,Hari,5000)
Emp(122,HR,Rani,6000)
Emp(123,Marketing,Suresh,6500)

// Export RDD content into Cassandra
scala> r1.saveToCassandra("test1","employee");

// verify the newly inserted rows in Cassandra
cqlsh> select * from test1.employee;
-----+-----------+---------+--------
 id  | dept      | name    | salary
-----+-----------+---------+--------
 123 | Marketing |  Suresh |   6500 // newly inserted from spark to Cassandra
 122 |        HR |    Rani |   6000 // newly inserted from spark to Cassandra
 121 |  accounts |    Hari |   5000 // newly inserted from spark to Cassandra
 102 |     spark |  sakthi |   3500
 101 |   bigdata |    siva |   3000
 103 |      Java | prakash |   3600


Cassandra and Spark Integration

Cassandra and Spark Integration



//start cassandra server

$ sudo service cassandra start

$ sudo update-rc.d cassandra defaults


//start CLI for Cassandra

$ cqlsh localhost
Connected to Test Cluster at localhost:9042.
[cqlsh 5.0.1 | Cassandra 3.11.4 | CQL spec 3.4.4 | Native protocol v4]

Use HELP for help.

cqlsh>  describe cluster;

Cluster: Test Cluster
Partitioner: Murmur3Partitioner

cqlsh>  describe keyspaces;

system_traces  system_schema  system_auth  system  system_distributed


cqlsh> CREATE KEYSPACE people with replication = {'class':'SimpleStrategy','replication_factor':1};
cqlsh> use people;

cqlsh:people> describe people;

CREATE TABLE users (
          ... id varchar,
          ... first_name varchar,
          ... last_name varchar,
          ... city varchar,
          ... emails varchar,
          ... PRIMARY KEY (id));


cqlsh:people> insert into users(id,first_name,last_name,city,emails) values ('101','Sankara','narayanan','PLTR','sa@sa.com');


cqlsh:people> insert into users(id,first_name,last_name,city,emails) values ('102','Harish','Kalyan','CHN','ha@ka.in');


cqlsh:people> select * from users;

 id  | city | emails    | first_name | last_name
-----+------+-----------+------------+-----------
 102 |  CHN |  ha@ka.in |     Harish |    Kalyan
 101 | PLTR | sa@sa.com |    Sankara | narayanan

cqlsh:people> describe users;

CREATE TABLE people.users (
    id text PRIMARY KEY,
    city text,
    emails text,
    first_name text,
    last_name text
) WITH bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
    AND comment = ''
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
    AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND crc_check_chance = 1.0
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99PERCENTILE';


//Get Spark Cassandra Connector from Maven repository:

<!-- https://mvnrepository.com/artifact/com.datastax.spark/spark-cassandra-connector -->
<dependency>
    <groupId>com.datastax.spark</groupId>
    <artifactId>spark-cassandra-connector_2.11</artifactId>
    <version>2.4.0</version>
</dependency>

Make this:
com.datastax.spark:spark-cassandra-connector_2.11:2.4.0
----------------------------------
It is download connector jar from : https://repo1.maven.org/maven2/com/datastax/spark/spark-cassandra-connector_2.11/2.4.0/spark-cassandra-connector_2.11-2.4.0.jar
// https://mvnrepository.com/artifact/com.datastax.spark/spark-cassandra-connector
libraryDependencies += "com.datastax.spark" %% "spark-cassandra-connector" % "2.4.0"
----------------------------------



// Run Spark with the above package:

spark-shell --packages com.datastax.spark:spark-cassandra-connector_2.11:2.4.0



scala> import com.datastax.spark.connector._
scala> import org.apache.spark.SparkConf
scala> import org.apache.spark.SparkContext
scala> import org.apache.spark.SparkContext._

scala> val conf = new SparkConf().setMaster("local").setAppName("sample cassandra app").set("spark.cassandra.connection.host","localhost").set("spark.driver.allowMultipleContexts","true")

scala> val sc = new SparkContext(conf)
scala> val personRDD = sc.cassandraTable("people","users") // KeySpace, Table name

scala> personRDD.take(2).foreach(println)
CassandraRow{id: 101a, city: PLTR, emails: sa@sa.com, first_name: Sankara, last_name: narayanan}
CassandraRow{id: 102, city: CHN, emails: ha@ka.in, first_name: Harish, last_name: Kalyan}

scala> personRDD.count
res1: Long = 2                                                               

scala> val df = spark.read.format("org.apache.spark.sql.cassandra").options(Map("keyspace" -> "people", "table" -> "users")).load

scala> df.show
+---+----+---------+----------+---------+                                     
| id|city|   emails|first_name|last_name|
+---+----+---------+----------+---------+
|101|PLTR|sa@sa.com|   Sankara|narayanan|
|102| CHN| ha@ka.in|    Harish|   Kalyan|
+---+----+---------+----------+---------+

Cassandra Installation on Ubuntu Steps

Cassandra Installation on Ubuntu Steps



$ sudo apt-get upgrade

$ sudo apt autoremove

$ sudo apt-key adv --keyserver pool.sks-keyservers.net --recv-key A278B781FE4B2BDA

$ sudo apt-get update

// verify Python version
$ python -V
Python 2.7.15+



Installing Casandra:
-----------------------

$ echo "deb http://www.apache.org/dist/cassandra/debian 311x main" | sudo tee -a /etc/apt/sources.list.d/cassandra.sources.list
deb http://www.apache.org/dist/cassandra/debian 311x main

$ curl https://www.apache.org/dist/cassandra/KEYS | sudo apt-key add -

$ sudo apt-get update

$ sudo apt-get install Cassandra

//start cassandra server
$ sudo service cassandra start

$ sudo update-rc.d cassandra defaults

hadoop@hadoop:/usr/local$ nodetool status

Datacenter: datacenter1
=======================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address    Load       Tokens       Owns (effective)  Host ID                               Rack
UN  127.0.0.1  103.67 KiB  256          100.0%            a8c39288-3d56-4768-ba56-9910e9ce02e2  rack1

//start CLI for Cassandra

$ cqlsh localhost
Connected to Test Cluster at localhost:9042.
[cqlsh 5.0.1 | Cassandra 3.11.4 | CQL spec 3.4.4 | Native protocol v4]
Use HELP for help.

Saturday, February 23, 2019

Flume Spark Intergeration

Step1

flumespark.conf
------------------------------
//creation of components
agent1.sources = flumesource
agent1.channels = flumechannel
agent1.sinks = flumesink

//Source Configuration
agent1.sources.flumesource.type = netcat
agent1.sources.flumesource.bind = localhost
agent1.sources.flumesource.port = 1234
agent1.sources.flumesource.channels = flumechannel

//Channel Configuration
agent1.channels.flumechannel.type = memory
agent1.channels.flumechannel.capacity=1000
agent1.channels.flumechannel.transactionCapacity=100

//Sink Configuration
agent1.sinks.flumesink.type = avro
agent1.sinks.flumesink.hostname = 192.168.0.104    =====>Windows IP addreess
agent1.sinks.flumesink.port = 7777
agent1.sinks.flumesink.channel = flumechannel



2)Set up the program in windows intellij

import org.apache.spark.streaming.flume._
import org.apache.spark.streaming.{Seconds,StreamingContext}
import org.apache.spark.{SparkConf,SparkContext}
import org.apache.log4j.Logger
import org.apache.log4j.Level
object FlumeSpark {
  Logger.getLogger("org").setLevel(Level.OFF)
  Logger.getLogger("akka").setLevel(Level.OFF)
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("spark=flumeintegeration(pure based approach").setMaster("local[2]")
    val sc = new SparkContext(conf)
    val BatchInterval = 20
    val host = "192.168.0.104"
    val portno = 7777
    val ssc = new StreamingContext(sc, Seconds(BatchInterval))

    //Pulling the data from flume application
    val flumedata = FlumeUtils.createStream(ssc, host, portno)
    val res = flumedata.map { x =>
      val event = x.event
      val messageBody = new String(event.getBody.array())
      messageBody
    }
    res.print()
    ssc.start()
    ssc.awaitTermination()
  }
}


build.sbt in the intellij


name := "SparkSampleProgram"

version := "0.1"

scalaVersion := "2.11.12"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.0"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.4.0"

libraryDependencies += "org.apache.spark" %% "spark-streaming-twitter" % "1.6.3"
libraryDependencies += "org.apache.spark" %% "spark-streaming-flume-assembly" % "2.3.2"


sbt.version = 1.0.3


3)Run the Flume code in the Ubundu


flume-ng agent --name agent1 --conf /home/hadoop/Desktop/vow --conf-file /home/hadoop/Desktop/vow/flumespark1.conf -Dflume.root.logger=DEBUG,console


4)Message from Ubundu

curl telnet://localhost:4444
I love india   ---> the message pass into window host



Python Challenges Program

Challenges program: program 1: #Input :ABAABBCA #Output: A4B3C1 str1="ABAABBCA" str2="" d={} for x in str1: d[x]=d...