Tuesday, March 12, 2019

Kafka : Producer, Consumer Programming in Scala with Multi Server Configuration

Kafka : Producer, Consumer Programming in Scala with Multi Server Configuration



// Here we are going to do Scala Programming for Kafka Producer, Kafka Consumer Programming in Scala with Multi Server Configuration

start zookeeper: 
hadoop@hadoop:/usr/local/kafka$ bin/zookeeper-server-start.sh config/zookeeper.properties

INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)

  default zookeeper port number is : 2181
 
Make a copy of server.properties and rename it into server1.properties,server2.properties,server3.properties respectively

Open individual file and change the port number mentioned below.

sudo gedit server0.properties
   broker.id=0
   zoo keeper port : 2181
   log.dirs=/tmp/k1/kafka-logs
   listeners=PLAINTEXT://:9090
  
  sudo gedit server1.properties
   broker.id=1
   zoo keeper port : 2181
   log.dirs=/tmp/k1/kafka-logs
   listeners=PLAINTEXT://:9091
  
  sudo gedit server2.properties
   broker.id=2
   zoo keeper port : 2181
   log.dirs=/tmp/k2/kafka-logs
   listeners=PLAINTEXT://:9092
  
  sudo gedit server3.properties
   broker.id=3
   zoo keeper port : 2181
   log.dirs=/tmp/k3/kafka-logs
   listeners=PLAINTEXT://:9093
  
Open 4 new terminals and run each lines in each terminals
hadoop@hadoop:/usr/local/kafka$ bin/kafka-server-start.sh config/server0.properties
INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
hadoop@hadoop:/usr/local/kafka$ bin/kafka-server-start.sh config/server1.properties  
INFO [KafkaServer id=1] started (kafka.server.KafkaServer)
hadoop@hadoop:/usr/local/kafka$ bin/kafka-server-start.sh config/server2.properties
INFO [KafkaServer id=2] started (kafka.server.KafkaServer)
hadoop@hadoop:/usr/local/kafka$ bin/kafka-server-start.sh config/server3.properties
INFO [KafkaServer id=3] started (kafka.server.KafkaServer)


// 4 different kafka server instances are running
hadoop@hadoop:/usr/local/kafka$ jps
6384 Jps
4851 Kafka  // #instance 1
6243 Main
4163 Kafka // #instance 2
3492 QuorumPeerMain
4504 Kafka // #instance 3
5196 Kafka // #instance 4

// Create a new topic named as : myTopic with 4 partitions and 4 replications
hadoop@hadoop:/usr/local/kafka$  bin/kafka-topics.sh --create --topic myTopic --partitions 4 --replication-factor 4 --zookeeper localhost:2181
Created topic "myTopic".

// see the topic : myTopic description
bin/kafka-topics.sh --describe  --zookeeper localhost:2181
Topic:myTopic PartitionCount:4 ReplicationFactor:4 Configs:
Topic: myTopic Partition: 0 Leader: 3 Replicas: 3,0,1,2 Isr: 3,0,1,2
Topic: myTopic Partition: 1 Leader: 0 Replicas: 0,1,2,3 Isr: 0,1,2,3
Topic: myTopic Partition: 2 Leader: 1 Replicas: 1,2,3,0 Isr: 1,2,3,0
Topic: myTopic Partition: 3 Leader: 2 Replicas: 2,3,0,1 Isr: 2,3,0,1


build.properties:
-----------------
sbt.version = 1.2.8

build.sbt dependency packages:
--------------------------------
name := "Kafka"

version := "0.1"

scalaVersion := "2.11.12"
// https://mvnrepository.com/artifact/org.apache.spark/spark-core
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-sql
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-twitter
libraryDependencies += "org.apache.spark" %% "spark-streaming-twitter" % "1.6.3"
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % "1.6.3"
// https://mvnrepository.com/artifact/org.apache.kafka/kafka
libraryDependencies += "org.apache.kafka" %% "kafka" % "2.1.1"



// Producer Programming in Scala
import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig,ProducerRecord}

object KafkaProducerExa2 {
  def main(args:Array[String]):Unit = {

    val props = new Properties()
    val topic = "myTopic"

    props.put("bootstrap.servers","192.168.0.106:9090,192.168.0.106:9091,192.168.0.106:9092,192.168.0.106:9093")
    props.put("acks","all")
    props.put("client.id","ProducerApp")
    props.put("retries","4")
    props.put("batch.size","32768")

    props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer")

    val producer = new KafkaProducer[String,String](props)
    val msg:String = "Welcome to Kafka : #"
    for (i <- 1 to 10){
      val data = new ProducerRecord[String,String](topic,msg+i.toString)
      producer.send(data)
    }
    producer.close()
    println("------Successfully published messages to topic : " + topic + "----")
  }
}


Run the program in IntelliJ IDEA.

------Successfully published messages to topic : myTopic----



// view the output in console:
hadoop@hadoop:/usr/local/kafka$ bin/kafka-console-consumer.sh --topic myTopic --bootstrap-server localhost:9090 --from-beginning
Welcome to Kafka : #3
Welcome to Kafka : #7
Welcome to Kafka : #1
Welcome to Kafka : #5
Welcome to Kafka : #9
Welcome to Kafka : #2
Welcome to Kafka : #6
Welcome to Kafka : #10
Welcome to Kafka : #4
Welcome to Kafka : #8



//Consumer Programming in Scala
import java.util.{Collections, Properties}
import org.apache.kafka.clients.consumer.KafkaConsumer

import scala.collection.JavaConversions._

object KafkaConsumerExa1 {
  def main(args: Array[String]): Unit = {
    val properties = new Properties()
    properties.put("bootstrap.servers", "192.168.0.106:9091")
    properties.put("group.id", "testGroup")
    properties.put("client.id", "ConsumerApp")

    properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    val consumer = new KafkaConsumer[String, String](properties)
    val topic = "myTopic"


  consumer.subscribe(Collections.singletonList(topic))
    System.out.println("Subscribed to topic " + topic)
    var records = consumer.poll(4000)
    consumer.seekToBeginning(consumer.assignment)
    records = consumer.poll(4000)
    for (record <- records.iterator()){
      println("Received Message : "  + record)
    }
    consumer.commitSync()
  }
}



Output:
--------
Subscribed to topic myTopic
Received Message : ConsumerRecord(topic = myTopic, partition = 0, leaderEpoch = 1, offset = 4, CreateTime = 1552402871872, serialized key size = -1, serialized value size = 21, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Welcome to Kafka : #4)
Received Message : ConsumerRecord(topic = myTopic, partition = 0, leaderEpoch = 1, offset = 5, CreateTime = 1552402871874, serialized key size = -1, serialized value size = 21, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Welcome to Kafka : #8)
Received Message : ConsumerRecord(topic = myTopic, partition = 3, leaderEpoch = 2, offset = 6, CreateTime = 1552402871872, serialized key size = -1, serialized value size = 21, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Welcome to Kafka : #2)
Received Message : ConsumerRecord(topic = myTopic, partition = 3, leaderEpoch = 2, offset = 7, CreateTime = 1552402871874, serialized key size = -1, serialized value size = 21, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Welcome to Kafka : #6)
Received Message : ConsumerRecord(topic = myTopic, partition = 3, leaderEpoch = 2, offset = 8, CreateTime = 1552402871874, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Welcome to Kafka : #10)
Received Message : ConsumerRecord(topic = myTopic, partition = 1, leaderEpoch = 1, offset = 6, CreateTime = 1552402871872, serialized key size = -1, serialized value size = 21, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Welcome to Kafka : #3)
Received Message : ConsumerRecord(topic = myTopic, partition = 1, leaderEpoch = 1, offset = 7, CreateTime = 1552402871874, serialized key size = -1, serialized value size = 21, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Welcome to Kafka : #7)
Received Message : ConsumerRecord(topic = myTopic, partition = 2, leaderEpoch = 1, offset = 4, CreateTime = 1552402871830, serialized key size = -1, serialized value size = 21, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Welcome to Kafka : #1)
Received Message : ConsumerRecord(topic = myTopic, partition = 2, leaderEpoch = 1, offset = 5, CreateTime = 1552402871872, serialized key size = -1, serialized value size = 21, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Welcome to Kafka : #5)
Received Message : ConsumerRecord(topic = myTopic, partition = 2, leaderEpoch = 1, offset = 6, CreateTime = 1552402871874, serialized key size = -1, serialized value size = 21, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Welcome to Kafka : #9)

No comments:

Post a Comment

Python Challenges Program

Challenges program: program 1: #Input :ABAABBCA #Output: A4B3C1 str1="ABAABBCA" str2="" d={} for x in str1: d[x]=d...