Tuesday, March 12, 2019

Kafka - Configuring Multiple Servers in Single Linux System

Kafka - Configuring Multiple Servers in Single Linux System



// Here I'm describing how to configure Multiple Kafka Server instances in Single Linux System.
// I will be having 3 different server.properties files with different configurations for each

start zookeeper: 
hadoop@hadoop:/usr/local/kafka$ bin/zookeeper-server-start.sh config/zookeeper.properties

INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)

  default zookeeper port number is : 2181
 
Make a copy of server.properties and rename them into :
  server1.properties,
  server2.properties,
  server3.properties respectively

Open individual file and change the port number mentioned below.

  sudo gedit server1.properties
   broker.id=1
   zoo keeper port : 2181
   log.dirs=/tmp/k1/kafka-logs
   listeners=PLAINTEXT://:9093
  
  sudo gedit server2.properties
   broker.id=2
   zoo keeper port : 2181
   log.dirs=/tmp/k2/kafka-logs
   listeners=PLAINTEXT://:9094
  
  sudo gedit server3.properties
   broker.id=3
   zoo keeper port : 2181
   log.dirs=/tmp/k3/kafka-logs
   listeners=PLAINTEXT://:9095
  
Open 3 new terminals and run each lines in each terminals
#1
hadoop@hadoop:/usr/local/kafka$ bin/kafka-server-start.sh config/server1.properties  
  [2019-03-11 21:50:24,652] INFO [KafkaServer id=1] started (kafka.server.KafkaServer)
#2
hadoop@hadoop:/usr/local/kafka$ bin/kafka-server-start.sh config/server2.properties
  [2019-03-11 21:50:41,541] INFO [KafkaServer id=2] started (kafka.server.KafkaServer)
#3
hadoop@hadoop:/usr/local/kafka$ bin/kafka-server-start.sh config/server3.properties
  [2019-03-11 21:50:55,067] INFO [KafkaServer id=3] started (kafka.server.KafkaServer)

// 3 different kafka server instances are running
hadoop@hadoop:/usr/local/kafka$ jps
5792 QuorumPeerMain
4501 NameNode
7653 Kafka  // #1
7303 Kafka  // #2
4695 DataNode
5417 NodeManager
5228 ResourceManager
8350 Jps
7999 Kafka  // #3
4927 SecondaryNameNode


// Currently running Kafka server instances are 3. But here I specified replication-factor as 4. It got failed

hadoop@hadoop:/usr/local/kafka$ bin/kafka-topics.sh --create --topic topic1 --partitions 3 --replication-factor 4 --zookeeper localhost:2181

Error while executing topic command : Replication factor: 4 larger than available brokers: 3.
[2019-03-11 21:54:58,039] ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 4 larger than available brokers: 3.
 (kafka.admin.TopicCommand$)


// Create a new topic named as : topic1 with 3 partitions and replications
hadoop@hadoop:/usr/local/kafka$ bin/kafka-topics.sh --create --topic topic1 --partitions 3 --replication-factor 3 --zookeeper localhost:2181
Created topic "topic1".

// see the topic description
hadoop@hadoop:/usr/local/kafka$ bin/kafka-topics.sh --describe topic1 --zookeeper localhost:2181
Topic:topic1 PartitionCount:3 ReplicationFactor:3 Configs:
Topic: topic1 Partition: 0 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
Topic: topic1 Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: topic1 Partition: 2 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1

// Isr : In Synch Replica

// create a new topic named : topic2 with 2 replicas
hadoop@hadoop:/usr/local/kafka$ bin/kafka-topics.sh --create --topic topic2 --partitions 3 --replication-factor 2 --zookeeper localhost:2181
Created topic "topic2".

// Describe topics
hadoop@hadoop:/usr/local/kafka$ bin/kafka-topics.sh --describe --zookeeper localhost:2181
Topic:topic1 PartitionCount:3 ReplicationFactor:3 Configs: // 3 replicas
Topic: topic1 Partition: 0 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
Topic: topic1 Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: topic1 Partition: 2 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1

Topic:topic2 PartitionCount:3 ReplicationFactor:2 Configs: // 2 replicas
Topic: topic2 Partition: 0 Leader: 1 Replicas: 1,3 Isr: 1,3
Topic: topic2 Partition: 1 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: topic2 Partition: 2 Leader: 3 Replicas: 3,2 Isr: 3,2


// console producer to send data to broker
hadoop@hadoop:/usr/local/kafka$ bin/kafka-console-producer.sh --topic topic1 --broker-list localhost:9093
>kafka1
>kafka2
>kafka3
>kafka4
>kafka5
>kafka6
>kafka7
>kafka8
>kafka9
>kafka10

// This is : How the data got distributed?
hadoop@hadoop:/usr/local/kafka$ a$ bin/kaftopics.sh --describe topic1 --zookeeper localhost:2181
Topic:topic1 PartitionCount:3 ReplicationFactor:3 Configs:
Topic: topic1 Partition: 0 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
Topic: topic1 Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: topic1 Partition: 2 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1


hadoop@hadoop:/usr/local/kafka$ bin/kafka-console-consumer.sh --topic topic1 --bootstrap-server localhost:9093 --from-beginning
kafka2
kafka5
kafka8

kafka3
kafka6
kafka9
kafka1
kafka4
kafka7
kafka10
^CProcessed a total of 11 messages
hadoop@hadoop:/usr/local/kafka$ bin/kafka-console-consumer.sh --topic topic1 --bootstrap-server localhost:9094 --from-beginning
kafka2
kafka5
kafka8

kafka3
kafka6
kafka9
kafka1
kafka4
kafka7
kafka10
^CProcessed a total of 11 messages
hadoop@hadoop:/usr/local/kafka$ bin/kafka-console-consumer.sh --topic topic1 --bootstrap-server localhost:9095 --from-beginning
kafka2
kafka5
kafka8

kafka3
kafka6
kafka9
kafka1
kafka4
kafka7
kafka10

bin/kafka-console-consumer.sh --topic topic1 --bootstrap-server localhost:9093 --partition 1 --offset earliest
kafka2
kafka5
kafka8




hadoop@hadoop:/usr/local/kafka$ bin/kafka-console-consumer.sh --topic topic1 --bootstrap-server localhost:9093 --partition 0 --offset earliest
kafka3
kafka6
kafka9


hadoop@hadoop:/usr/local/kafka$ bin/kafka-console-consumer.sh --topic topic1 --bootstrap-server localhost:9093 --partition 1 --offset earliest
kafka2
kafka5
kafka8


hadoop@hadoop:/usr/local/kafka$ bin/kafka-console-consumer.sh --topic topic1 --bootstrap-server localhost:9093 --partition 2 --offset earliest
kafka1
kafka4
kafka7
kafka10


Consumer Group:
---------------


If set of consumers are trying to read data from same topic,
but from different partitions these consumers must belongs to same group



sudo gedit consumer.properties
------------------------------

#consume group id
group.id=group1


B - Broker
p - partition
m - message / records

Partition replicated across:
---------------------------

p1 -> B1,B2,B3
p2 -> B2,B3,B1
p3 -> B3,B1,B2

Broker -> Partition -> Messages (round robin)
B1 -> p1 ->> m1,m4,m7
B2 -> p2 ->> m2,m5,m8
B3 -> p3 ->> m3,m6,m9

Offset mapping:
offset (0) -> m1,m2,m3
offset (1) -> m4,m5,m6
offset (2) -> m7, m8, m9



hadoop@hadoop:/usr/local/kafka$  bin/kafka-topics.sh --create --topic topic1 --partitions 3 --replication-factor 3 --zookeeper localhost:2181
Created topic "topic1".
hadoop@hadoop:/usr/local/kafka$ bin/kafka-topics.sh --describe --zookeeper localhost:2181
Topic:topic1 PartitionCount:3 ReplicationFactor:3 Configs:
Topic: topic1 Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: topic1 Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
Topic: topic1 Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2

// producer is writing data into topics

hadoop@hadoop:/usr/local/kafka$ bin/kafka-console-producer.sh --topic topic1 --broker-list localhost:9093
>i love india
>2. i love pakistan
>3. i love south africa
>4. i love srilanka
>5. i love singapore
>6. i love malaysia
>7. i love indonesia
>8. i love thailand
>9. i love australia
>10. i love newzealand
>^Chadoop@hadoop:/usr/local/kafka$ bin/kafka-console-producer.sh --topic topic1 --broker-list localhost:9093
>amala
>paul
>vimal
>kalavani
>oviya

// consumer is reading data from topics
hadoop@hadoop:/usr/local/kafka$ bin/kafka-console-consumer.sh --topic topic1 --bootstrap-server localhost:9093 --from-beginning
i love india
4. i love srilanka
7. i love indonesia
10. i love newzealand
2. i love pakistan
5. i love singapore
8. i love thailand
3. i love south africa
6. i love malaysia
9. i love australia
^CProcessed a total of 10 messages
hadoop@hadoop:/usr/local/kafka$ bin/kafka-console-consumer.sh --topic topic1 --bootstrap-server localhost:9093 --partition 0 --offset earliest
2. i love pakistan
5. i love singapore
8. i love thailand
^CProcessed a total of 3 messages
hadoop@hadoop:/usr/local/kafka$ bin/kafka-console-consumer.sh --topic topic1 --bootstrap-server localhost:9093 --partition 1 --offset earliest
i love india
4. i love srilanka
7. i love indonesia
10. i love newzealand
^CProcessed a total of 4 messages
hadoop@hadoop:/usr/local/kafka$ bin/kafka-console-consumer.sh --topic topic1 --bootstrap-server localhost:9093 --partition 2 --offset earliest
3. i love south africa
6. i love malaysia
9. i love australia
^CProcessed a total of 3 messages
hadoop@hadoop:/usr/local/kafka$ bin/kafka-console-consumer.sh --topic topic1 --bootstrap-server localhost:9093 --partition 2 --offset earliest
3. i love south africa
6. i love malaysia
9. i love australia
paul
oviya


// I forcefully stopped 3rd server : hadoop@hadoop:/usr/local/kafka$ bin/kafka-server-start.sh config/server3.properties
^C
// so 3rd server is failed but zookeeper lets 1st or 2nd kafka server become leader for the new inputs



hadoop@hadoop:/usr/local/kafka$ bin/kafka-console-producer.sh --topic topic1 --broker-list localhost:9093
>i love india
>2. i love pakistan
>3. i love south africa
>4. i love srilanka
>5. i love singapore
>6. i love malaysia
>7. i love indonesia
>8. i love thailand
>9. i love australia
>10. i love newzealand
>^Chadoop@hadoop:/usr/local/kafka$ bin/kafka-console-producer.sh --topic topic1 --broker-list localhost:9093
>amala
>paul
>vimal
>kalavani
>oviya // 3rd server failed here
>super  // new inputs without 3rd server. But 1st or 2nd server become leader for the below inputs
>star
>rajinikanth
>i love india
>kamal
>kalai
>selvam
>awesome
>nator
>mai
>

// consumer displays the results even one of the server (i.e, server3) fails
hadoop@hadoop:/usr/local/kafka$ bin/kafka-console-consumer.sh --topic topic1 --bootstrap-server localhost:9093 --from-beginning
i love india
4. i love srilanka
7. i love indonesia
10. i love newzealand
vimal
super
i love india
selvam
2. i love pakistan
5. i love singapore
8. i love thailand
amala
kalavani
star
kamal
3. i love south africa
6. i love malaysia
9. i love australia
paul
oviya
rajinikanth
kalai
awesome
nator


// now 3rd server is down
hadoop@hadoop:/usr/local/kafka$ bin/kafka-topics.sh --describe --zookeeper localhost:2181
Topic:topic1 PartitionCount:3 ReplicationFactor:3 Configs:
Topic: topic1 Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2
Topic: topic1 Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,1
Topic: topic1 Partition: 2 Leader: 1 Replicas: 3,1,2 Isr: 1,2

// There are 3 replicas. a) 1,2,3. b) 2,3,1. c) 3,1,2 ==> expected
But actual is : a ) Isr : 1,2. b) Isr : 2,1. c) Isr : 1,2
// Because 3rd server is down - Earlier we forcefully did shutdown


//Now I am manually making server3 up:
hadoop@hadoop:/usr/local/kafka$ bin/kafka-server-start.sh config/server3.properties
// so, 3rd server is up and running now.

// I'm checking it again:
hadoop@hadoop:/usr/local/kafka$ bin/kafka-topics.sh --describe --zookeeper localhost:2181
// Now see the replicas - everything perfect. 3rd server got values from Server1 and Server2

Topic:topic1 PartitionCount:3 ReplicationFactor:3 Configs:
Topic: topic1 Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: topic1 Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,1,3
Topic: topic1 Partition: 2 Leader: 1 Replicas: 3,1,2 Isr: 1,2,3

No comments:

Post a Comment

Python Challenges Program

Challenges program: program 1: #Input :ABAABBCA #Output: A4B3C1 str1="ABAABBCA" str2="" d={} for x in str1: d[x]=d...