input file:
nameList.json:
--------------
{"id":992,"name":"Herman"," city":"Iles","country":" Colombia","Skills":"CVE"},
{"id":993,"name":"Burton"," city":"Santo Tomas","country":"Philippines" ,"Skills":"VMware vSphere"},
{"id":994,"name":"Correna"," city":"Shirgjan","country":" Albania","Skills":"Wyse"},
{"id":995,"name":"Cathi"," city":"Dorūd","country":"Iran" ,"Skills":"SSCP"},
{"id":996,"name":"Lena","city" :"Al Judayrah","country":" Palestinian Territory","Skills":" Commercial Kitchen Design"},
{"id":997,"name":"Madalena"," city":"Livadiya","country":" Ukraine","Skills":"Software Development"},
{"id":998,"name":"Jo-anne"," city":"Khatsyezhyna","country" :"Belarus","Skills":"TPD"},
{"id":999,"name":"Georgi"," city":"Pasuruan","country":" Indonesia","Skills":"Project Engineering"},
{"id":1000,"name":"Scott"," city":"Gyumri","country":" Armenia","Skills":"RHEV"}
start zookeeper:
hadoop@hadoop:/usr/local/ kafka$ bin/zookeeper-server-start.sh config/zookeeper.properties
start kafka server:
hadoop@hadoop:/usr/local/ kafka$ bin/kafka-server-start.sh config/server.properties
bin/kafka-topics.sh --create --topic jsonTopic --partitions 1 --replication-factor 1 --zookeeper localhost:2181
//view the topics available in Kafka Broker
hadoop@hadoop:/usr/local/ kafka$ bin/kafka-topics.sh --list --zookeeper localhost:2181
jsonTopic
// create a database and table in MySQL:
hadoop@hadoop:/usr/local/ kafka$ sudo mysql;
[sudo] password for hadoop:
Welcome to the MySQL monitor. Commands end with ; or \g.
Your MySQL connection id is 4
Server version: 5.7.25-0ubuntu0.18.10.2 (Ubuntu)
Copyright (c) 2000, 2019, Oracle and/or its affiliates. All rights reserved.
Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.
Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.
mysql> create database KafkaDB;
Query OK, 1 row affected (0.05 sec)
mysql> use KafkaDB;
Database changed
mysql> create table jsonTable (id int, name varchar(50), city varchar(50), country varchar(50), skills varchar(50));
Query OK, 0 rows affected (0.20 sec)
//Produce reads json file and publish them in kafka topic
Producer Programming in Scala:
------------------------------ -
import java.util.Properties
import org.apache.kafka.clients. producer.{KafkaProducer, ProducerConfig,ProducerRecord}
object JsonPublisherExa {
def main(args:Array[String]):Unit = {
val props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("acks","all")
props.put("client.id"," ProducerApp")
props.put("retries","4")
props.put("batch.size","32768" )
props.put("key.serializer"," org.apache.kafka.common. serialization. StringSerializer")
props.put("value.serializer"," org.apache.kafka.common. serialization. StringSerializer")
val topic = "jsonTopic"
val producer = new KafkaProducer[String,String]( props)
val file = scala.io.Source.fromFile("/ home/hadoop/Desktop/vow/ nameList.json")
for (line <- file.getLines()) {
val msg = new ProducerRecord[String,String]( topic,line)
producer.send(msg)
}
producer.close()
}
}
Run the below command:
>spark shell
>scala :load JsonPublisherExa .scala
>scala JsonPublisherExa..main(null)
// Run the Producer Program in IntelliJ IDEA
hadoop@hadoop:/usr/local/ kafka$ bin/kafka-console-consumer.sh --topic jsonTopic --bootstrap-server localhost:9092 --from-beginning
{"id":1,"name":"Sharline"," city":"Uppsala","country":" Sweden","Skills":"Eyelash Extensions"},
{"id":2,"name":"Marris","city" :"São Domingos","country":"Cape Verde","Skills":"VMI Programs"},
{"id":3,"name":"Gregg","city": "Qaxbaş","country":" Azerbaijan","Skills":" Historical Research"},
{"id":4,"name":"Christye"," city":"Guarapari","country":" Brazil","Skills":"Army"},
{"id":5,"name":"Modesta"," city":"Paltamo","country":" Finland","Skills":"Avaya Technologies"},
sc.stop()
import org.apache.spark.SparkConf
import org.apache.spark.streaming. StreamingContext
import org.apache.spark.streaming. Seconds
import org.apache.spark.streaming. kafka.KafkaUtils
import org.apache.log4j.Logger
import org.apache.log4j.Level
import java.util.Properties
import org.apache.spark.sql.{ SQLContext,SaveMode}
import org.apache.spark.streaming.{ Seconds,StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
object KafkaMySQL{
def main(args:Array[String]):Unit ={
Logger.getLogger("org"). setLevel(Level.OFF)
Logger.getLogger("akka"). setLevel(Level.OFF)
val conf = new SparkConf().setAppName(" SparkStreamingJson"). setMaster("local[2]")
val sc = new SparkContext(conf)
val sqlc = new SQLContext(sc)
val batchInterval = 20
val zk = "localhost:2181"
val consumerGroupID = "jsonGroup"
val topic = "jsonTopic"
val partition = 0
val perTopicPartitions = Map(topic -> partition)
val ssc = new StreamingContext(sc,Seconds( batchInterval))
val KafkaData = KafkaUtils.createStream(ssc, zk,consumerGroupID, perTopicPartitions)
val ks = KafkaData.map (x => x._2)
ks.foreachRDD { x =>
val df = sqlc.read.json(x)
val props = new Properties()
props.put("driver","com.mysql. jdbc.Driver")
props.put("user","hadoop")
props.put("password","hadoop")
df.write.mode(SaveMode.Append) .jdbc("jdbc:mysql://localhost: 3306/KafkaDB","KafkaDB. jsonTable",props)
df.show()
}
ssc.start()
ssc.awaitTermination()
}
}
Run the below command:
>spark shell
Run the below command:
>spark shell
>scala :load KafkaMySQL.scala
>scala KafkaMySQL.main(null)
Output:
mysql> select count(*) from jsonTable;
+----------+
| count(*) |
+----------+
| 50 |
+----------+
1 row in set (0.00 sec)
mysql> select * from from jsonTable;
ERROR 1064 (42000): You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near 'from jsonTable' at line 1
mysql> select * from jsonTable;
+------+------------+--------------------------+------------------+-------------------------------+
| id | name | city | country | Skills |
+------+------------+--------------------------+------------------+-------------------------------+
| 1 | Sharline | Uppsala | Sweden | Eyelash Extensions |
| 2 | Marris | São Domingos | Cape Verde | VMI Programs |
| 3 | Gregg | Qaxbaş | Azerbaijan | Historical Research |
| 4 | Christye | Guarapari | Brazil | Army |
| 5 | Modesta | Paltamo | Finland | Avaya Technologies |
| 6 | Cletis | Changxing | China | Creativity Skills |
| 7 | Erica | Würzburg | Germany | FCAPS |
| 8 | Ebeneser | San Marcelino | Philippines | Copywriting |
| 9 | Lois | Banjar Kubu | Indonesia | Diabetes |
| 10 | Adolf | Tulsa | United States | XOG |
| 11 | Brennen | Tabuadelo | Portugal | Front-end |
| 12 | Joseito | Parreira | Portugal | LDAP |
| 13 | Dulcie | Yitulihe | China | GCS |
| 14 | Kathye | Donostia-San Sebastian | Spain | SRDF |
| 15 | Mendy | Duran | Philippines | Ksh |
| 17 | Hamil | Rangpur | Bangladesh | Online Advertising |
| 18 | Noni | Wojciechów | Poland | XSLT |
| 19 | Crystie | Otutara | French Polynesia | CGI |
| 20 | Mattias | Xueshan | China | TCLEOSE Instruction |
| 21 | Cassius | Jinhua | China | Switches |
| 22 | Elsy | Yahe | China | Audio Editing |
| 23 | Cassie | Cafayate | Argentina | ZigBee |
| 24 | Elvina | Benito Juarez | Mexico | .htaccess |
| 25 | Sax | Yudai | China | Yacht Deliveries |
| 26 | Somerset | Oxelösund | Sweden | Cfengine |
| 27 | Patricia | Blizne | Poland | RLC |
| 28 | Baxie | Yanino Vtoroye | Russia | Konica |
| 29 | Corena | Huallanca | Peru | Online Journalism |
| 30 | Kellen | Lido | Indonesia | Oleochemicals |
| 31 | Ellwood | Amiens | France | MCSA + Messaging |
| 32 | Chad | Igpit | Philippines | iPad Development |
| 33 | Hannis | Ubajami | Indonesia | ERP |
| 34 | Tilly | Jutiapa | Guatemala | SDS-PAGE |
| 35 | Lothaire | Šentilj v Slov. Goricah | Slovenia | AQL |
| 36 | Ysabel | Promyshlennaya | Russia | TNS |
| 37 | Darcey | Dumbéa | New Caledonia | VMware Certified Professional |
| 38 | Fitzgerald | Cha’anpu | China | Switchboard Operator |
| 39 | Elinor | Ijebu-Ife | Nigeria | AHLTA |
| 40 | Ricca | Ushi | Armenia | IOSH |
| 41 | Maryanne | Ajuda | Portugal | Active DoD Secret Clearance |
| 42 | Mickie | Guanmian | China | Myriad |
| 43 | Hamil | Gorgoram | Nigeria | Wound Care |
| 44 | Marcille | Fahraj | Iran | Hydrogeology |
| 45 | Karie | Tunbao | China | Oracle XE |
| 46 | Marmaduke | Anserma | Colombia | XSL-FO |
| 47 | Annabell | Blimbing | Indonesia | JSR 168 |
| 48 | Felice | Xiongguan | China | Safety Management Systems |
| 49 | Jolie | Walakeri | Indonesia | Medical Writing |
| 50 | Ashlie | Daiyue | China | EJB |
| 51 | Salim | Rancaseneng | Indonesia | MXF |
+------+------------+--------------------------+------------------+-------------------------------+
50 rows in set (0.00 sec)
No comments:
Post a Comment