Saturday, March 16, 2019

Kafka Spark Streaming[JSON FILE is processed in spark streaming and load into MYSQL database]

input file:
nameList.json:
--------------
{"id":992,"name":"Herman","city":"Iles","country":"Colombia","Skills":"CVE"},
{"id":993,"name":"Burton","city":"Santo Tomas","country":"Philippines","Skills":"VMware vSphere"},
{"id":994,"name":"Correna","city":"Shirgjan","country":"Albania","Skills":"Wyse"},
{"id":995,"name":"Cathi","city":"Dorūd","country":"Iran","Skills":"SSCP"},
{"id":996,"name":"Lena","city":"Al Judayrah","country":"Palestinian Territory","Skills":"Commercial Kitchen Design"},
{"id":997,"name":"Madalena","city":"Livadiya","country":"Ukraine","Skills":"Software Development"},
{"id":998,"name":"Jo-anne","city":"Khatsyezhyna","country":"Belarus","Skills":"TPD"},
{"id":999,"name":"Georgi","city":"Pasuruan","country":"Indonesia","Skills":"Project Engineering"},
{"id":1000,"name":"Scott","city":"Gyumri","country":"Armenia","Skills":"RHEV"}



start zookeeper:
hadoop@hadoop:/usr/local/kafka$ bin/zookeeper-server-start.sh config/zookeeper.properties 
start kafka server:
hadoop@hadoop:/usr/local/kafka$ bin/kafka-server-start.sh config/server.properties 
bin/kafka-topics.sh --create --topic jsonTopic --partitions 1 --replication-factor 1 --zookeeper localhost:2181

//view the topics available in Kafka Broker
hadoop@hadoop:/usr/local/kafka$  bin/kafka-topics.sh --list --zookeeper localhost:2181
jsonTopic



// create a database and table in MySQL:

hadoop@hadoop:/usr/local/kafka$ sudo mysql;
[sudo] password for hadoop: 
Welcome to the MySQL monitor.  Commands end with ; or \g.
Your MySQL connection id is 4
Server version: 5.7.25-0ubuntu0.18.10.2 (Ubuntu)

Copyright (c) 2000, 2019, Oracle and/or its affiliates. All rights reserved.

Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.

Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.

mysql> create database KafkaDB;
Query OK, 1 row affected (0.05 sec)

mysql> use KafkaDB;
Database changed
mysql> create table jsonTable (id int, name varchar(50), city varchar(50), country varchar(50), skills varchar(50));
Query OK, 0 rows affected (0.20 sec)




//Produce reads json file and publish them in kafka topic 
Producer Programming in Scala:
-------------------------------
import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig,ProducerRecord}

object JsonPublisherExa {
  def main(args:Array[String]):Unit = {
    val props = new Properties()

    props.put("bootstrap.servers","localhost:9092")
    props.put("acks","all")
    props.put("client.id","ProducerApp")
    props.put("retries","4")
    props.put("batch.size","32768")
    props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer")

    val topic = "jsonTopic"
    val producer = new KafkaProducer[String,String](props)
    val file = scala.io.Source.fromFile("/home/hadoop/Desktop/vow/nameList.json")

    for (line <- file.getLines()) {
      val msg = new ProducerRecord[String,String](topic,line)
      producer.send(msg)
    }
    producer.close()
  }
}

Run the below command:
>spark shell 

>scala  :load JsonPublisherExa .scala

>scala  JsonPublisherExa..main(null)

// Run the Producer Program in IntelliJ IDEA
hadoop@hadoop:/usr/local/kafka$ bin/kafka-console-consumer.sh --topic jsonTopic --bootstrap-server localhost:9092 --from-beginning

{"id":1,"name":"Sharline","city":"Uppsala","country":"Sweden","Skills":"Eyelash Extensions"},
{"id":2,"name":"Marris","city":"São Domingos","country":"Cape Verde","Skills":"VMI Programs"},
{"id":3,"name":"Gregg","city":"Qaxbaş","country":"Azerbaijan","Skills":"Historical Research"},
{"id":4,"name":"Christye","city":"Guarapari","country":"Brazil","Skills":"Army"},
{"id":5,"name":"Modesta","city":"Paltamo","country":"Finland","Skills":"Avaya Technologies"},
sc.stop()


import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.log4j.Logger
import org.apache.log4j.Level
import java.util.Properties
import org.apache.spark.sql.{SQLContext,SaveMode}
import org.apache.spark.streaming.{Seconds,StreamingContext}
import org.apache.spark.{SparkConf,SparkContext}

object KafkaMySQL{
def main(args:Array[String]):Unit ={
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
val conf = new SparkConf().setAppName("SparkStreamingJson").setMaster("local[2]")
val sc = new SparkContext(conf)
val sqlc = new SQLContext(sc)
val batchInterval = 20
val zk = "localhost:2181"
val consumerGroupID = "jsonGroup"
val topic = "jsonTopic"
val partition = 0
val perTopicPartitions = Map(topic -> partition)
val ssc = new StreamingContext(sc,Seconds(batchInterval))
val KafkaData = KafkaUtils.createStream(ssc,zk,consumerGroupID,perTopicPartitions)
val ks = KafkaData.map (x => x._2)
ks.foreachRDD { x =>
val df = sqlc.read.json(x)
val props = new Properties()
props.put("driver","com.mysql.jdbc.Driver")
props.put("user","hadoop")
props.put("password","hadoop")
df.write.mode(SaveMode.Append).jdbc("jdbc:mysql://localhost:3306/KafkaDB","KafkaDB.jsonTable",props)
df.show()
}
ssc.start()
ssc.awaitTermination()
}
}

Run the below command:
>spark shell 

>scala  :load KafkaMySQL.scala

>scala  KafkaMySQL.main(null)

Output:


mysql> select count(*) from jsonTable;
+----------+
| count(*) |
+----------+
|       50 |
+----------+
1 row in set (0.00 sec)
mysql> select * from from jsonTable;
ERROR 1064 (42000): You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near 'from jsonTable' at line 1
mysql> select * from jsonTable;
+------+------------+--------------------------+------------------+-------------------------------+
| id   | name       | city                     | country          | Skills                        |
+------+------------+--------------------------+------------------+-------------------------------+
|    1 | Sharline   | Uppsala                  | Sweden           | Eyelash Extensions            |
|    2 | Marris     | São Domingos             | Cape Verde       | VMI Programs                  |
|    3 | Gregg      | Qaxbaş                   | Azerbaijan       | Historical Research           |
|    4 | Christye   | Guarapari                | Brazil           | Army                          |
|    5 | Modesta    | Paltamo                  | Finland          | Avaya Technologies            |
|    6 | Cletis     | Changxing                | China            | Creativity Skills             |
|    7 | Erica      | Würzburg                 | Germany          | FCAPS                         |
|    8 | Ebeneser   | San Marcelino            | Philippines      | Copywriting                   |
|    9 | Lois       | Banjar Kubu              | Indonesia        | Diabetes                      |
|   10 | Adolf      | Tulsa                    | United States    | XOG                           |
|   11 | Brennen    | Tabuadelo                | Portugal         | Front-end                     |
|   12 | Joseito    | Parreira                 | Portugal         | LDAP                          |
|   13 | Dulcie     | Yitulihe                 | China            | GCS                           |
|   14 | Kathye     | Donostia-San Sebastian   | Spain            | SRDF                          |
|   15 | Mendy      | Duran                    | Philippines      | Ksh                           |
|   17 | Hamil      | Rangpur                  | Bangladesh       | Online Advertising            |
|   18 | Noni       | Wojciechów               | Poland           | XSLT                          |
|   19 | Crystie    | Otutara                  | French Polynesia | CGI                           |
|   20 | Mattias    | Xueshan                  | China            | TCLEOSE Instruction           |
|   21 | Cassius    | Jinhua                   | China            | Switches                      |
|   22 | Elsy       | Yahe                     | China            | Audio Editing                 |
|   23 | Cassie     | Cafayate                 | Argentina        | ZigBee                        |
|   24 | Elvina     | Benito Juarez            | Mexico           | .htaccess                     |
|   25 | Sax        | Yudai                    | China            | Yacht Deliveries              |
|   26 | Somerset   | Oxelösund                | Sweden           | Cfengine                      |
|   27 | Patricia   | Blizne                   | Poland           | RLC                           |
|   28 | Baxie      | Yanino Vtoroye           | Russia           | Konica                        |
|   29 | Corena     | Huallanca                | Peru             | Online Journalism             |
|   30 | Kellen     | Lido                     | Indonesia        | Oleochemicals                 |
|   31 | Ellwood    | Amiens                   | France           | MCSA + Messaging              |
|   32 | Chad       | Igpit                    | Philippines      | iPad Development              |
|   33 | Hannis     | Ubajami                  | Indonesia        | ERP                           |
|   34 | Tilly      | Jutiapa                  | Guatemala        | SDS-PAGE                      |
|   35 | Lothaire   | Šentilj v Slov. Goricah  | Slovenia         | AQL                           |
|   36 | Ysabel     | Promyshlennaya           | Russia           | TNS                           |
|   37 | Darcey     | Dumbéa                   | New Caledonia    | VMware Certified Professional |
|   38 | Fitzgerald | Cha’anpu                 | China            | Switchboard Operator          |
|   39 | Elinor     | Ijebu-Ife                | Nigeria          | AHLTA                         |
|   40 | Ricca      | Ushi                     | Armenia          | IOSH                          |
|   41 | Maryanne   | Ajuda                    | Portugal         | Active DoD Secret Clearance   |
|   42 | Mickie     | Guanmian                 | China            | Myriad                        |
|   43 | Hamil      | Gorgoram                 | Nigeria          | Wound Care                    |
|   44 | Marcille   | Fahraj                   | Iran             | Hydrogeology                  |
|   45 | Karie      | Tunbao                   | China            | Oracle XE                     |
|   46 | Marmaduke  | Anserma                  | Colombia         | XSL-FO                        |
|   47 | Annabell   | Blimbing                 | Indonesia        | JSR 168                       |
|   48 | Felice     | Xiongguan                | China            | Safety Management Systems     |
|   49 | Jolie      | Walakeri                 | Indonesia        | Medical Writing               |
|   50 | Ashlie     | Daiyue                   | China            | EJB                           |
|   51 | Salim      | Rancaseneng              | Indonesia        | MXF                           |
+------+------------+--------------------------+------------------+-------------------------------+
50 rows in set (0.00 sec)

No comments:

Post a Comment

Python Challenges Program

Challenges program: program 1: #Input :ABAABBCA #Output: A4B3C1 str1="ABAABBCA" str2="" d={} for x in str1: d[x]=d...