Tuesday, February 19, 2019

Kafka Console Producer and Consumer Step by Step

Kafka
-------
  ZooKeeper
  Broker
   Topic
    Partition
     Replica
    
  Producer
 
  Consumer
 
[cloudera@quickstart bin]$ su
Password: cloudera

[root@quickstart bin]# pwd
/home/cloudera/kafka_2.11-1.0.0/bin

  [root@quickstart kafka_2.11-1.0.0]# cd config

  [root@quickstart config]# ls

   connect-console-sink.properties    consumer.properties
   connect-console-source.properties  log4j.properties
   connect-distributed.properties     producer.properties
   connect-file-sink.properties       server.properties
   connect-file-source.properties     tools-log4j.properties
   connect-log4j.properties           zookeeper.properties
   connect-standalone.properties

  
All broker should refer the same port number mentioned in ZooKeeper


Spark Streaming and Kafka Integration steps:

1. start ZooKeeper server
2. Start Kafka Brokers (one or more)
3. Create Topic
4. Start Console Producer (To write Message into Broker's topic)
5. start console consumer (To test)
6. create spark streaming context, which streams from kafka topic
7. perform transfromations or aggregations
8. output operation


[root@quickstart kafka_2.11-1.0.0]# gedit config/zookeeper.properties
  change port number : 2181 to 2182
  save it and close it
  
   pwd
   /home/cloudera/kafka_2.11-1.0.0
   [root@quickstart kafka_2.11-1.0.0]#

start zookeeper server:

Step #1:  sh bin/zookeeper-server-start.sh config/zookeeper.properties
   (at the end : [2018-10-20 01:55:47,072] INFO binding to port 0.0.0.0/0.0.0.0:2182 (org.apache.zookeeper.server.NIOServerCnxnFactory)
  
 
  [root@quickstart kafka_2.11-1.0.0]# gedit config/server.properties
   change zookeeper.connect=localhost:2181 ==> zookeeper.connect=localhost:2182
 
start Kafka Server:

step #2: # sh bin/kafka-server-start.sh config/server.properties
  (at the end : INFO [KafkaServer id=0] started (kafka.server.KafkaServer)

create a kafka topic:

step #3: sh bin/kafka-topics.sh --create  --zookeeper quickstart.cloudera:2182 --replication-factor 1 --partitions 3 --topic testtopic

   sh bin/kafka-topics.sh --create  --zookeeper quickstart.cloudera:2182 --replication-factor 1 --partitions 3 --topic mytopic

   Created topic "mytopic".


to list available topic in kafka broker:

step #4: sh bin/kafka-topics.sh --list --zookeeper localhost:2182
  mytopic
  testtopic
 
to write messages into kafka topic: (with the help of kafka console producer)

step #5: bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mytopic
  >hello
  >how are you
  >bye
 
step #6: sh bin/kafka-console-consumer.sh --zookeeper quickstart.cloudera:2182 --topic mytopic --from-beginning

Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].

bye
hello
how are you

No comments:

Post a Comment

Python Challenges Program

Challenges program: program 1: #Input :ABAABBCA #Output: A4B3C1 str1="ABAABBCA" str2="" d={} for x in str1: d[x]=d...