Thursday, March 14, 2019

Kafka : Call Logs SUCCESS, FAILED, DROPPED - Producer and Consumer Programming in Scala

Kafka : Call Logs SUCCESS, FAILED, DROPPED - Producer and Consumer Programming in Scala



input file:
calllogdata.txt:
-----------------
ec59cea2-5006-448f-a031-d5e53f33be232014-03-15 00:02:482014-03-15 00:06:05DROPPED 80526900577757919463
ec59cea2-5006-448f-a032-d5e53f33be232014-03-15 00:02:482014-03-15 00:06:07DROPPED 98861773759916790556
ec59cea2-5006-448f-a033-d5e53f33be232014-03-16 00:02:482014-03-16 00:06:45SUCCESS 86186279969886177375
ec59cea2-5006-448f-a034-d5e53f33be232014-03-16 00:02:482014-03-16 00:06:53DROPPED 98765156164894949494
ec59cea2-5006-448f-a035-d5e53f33be232014-03-16 00:02:482014-03-16 00:06:12FAILED  54545454546469496477
ec59cea2-5006-448f-a036-d5e53f33be232014-03-16 00:02:482014-03-16 00:06:05SUCCESS 12354678902153698431
ec59cea2-5006-448f-a037-d5e53f33be232014-03-17 00:02:482014-03-17 00:06:05DROPPED 80556456458478787877
ec59cea2-5006-448f-a038-d5e53f33be232014-03-17 00:02:482014-03-17 00:06:05DROPPED 80809056095676236992
ec59cea2-5006-448f-a039-d5e53f33be232014-03-17 00:02:482014-03-17 00:06:05FAILED  44554584848449644469
ec59cea2-5006-448f-a040-d5e53f33be232014-03-18 00:02:482014-03-18 00:06:05DROPPED 96090652158087080806
ec59cea2-5006-448f-a041-d5e53f33be232014-03-18 00:02:482014-03-18 00:06:05DROPPED 89797946465879874615
ec59cea2-5006-448f-a042-d5e53f33be232014-03-18 00:02:482014-03-18 00:06:05SUCCESS 45454545457978978979
ec59cea2-5006-448f-a043-d5e53f33be232014-03-18 00:02:482014-03-18 00:06:05DROPPED 74584564564564564656
ec59cea2-5006-448f-a044-d5e53f33be232014-03-19 00:02:482014-03-19 00:06:05DROPPED 98794894945648947898
ec59cea2-5006-448f-a045-d5e53f33be232014-03-19 00:02:482014-03-19 00:06:05SUCCESS 84645645605646064646
ec59cea2-5006-448f-a046-d5e53f33be232014-03-19 00:02:482014-03-19 00:06:05DROPPED 78545456456456456456
ec59cea2-5006-448f-a047-d5e53f33be232014-03-19 00:02:482014-03-19 00:06:05DROPPED 57554548979797979797
ec59cea2-5006-448f-a048-d5e53f33be232014-03-20 00:02:482014-03-20 00:06:05SUCCESS 87898640989489089409
ec59cea2-5006-448f-a049-d5e53f33be232014-03-20 00:02:482014-03-20 00:06:05SUCCESS 75884848478978978979
ec59cea2-5006-448f-a050-d5e53f33be232014-03-20 00:02:482014-03-20 00:06:05SUCCESS 74894086489489489489
ec59cea2-5006-448f-a031-d5e53f33be232014-03-15 00:02:482014-03-15 00:06:05DROPPED 80526900577757919463
ec59cea2-5006-448f-a032-d5e53f33be232014-03-15 00:02:482014-03-15 00:06:07DROPPED 98861773759916790556
ec59cea2-5006-448f-a033-d5e53f33be232014-03-16 00:02:482014-03-16 00:06:45SUCCESS 86186279969886177375
ec59cea2-5006-448f-a034-d5e53f33be232014-03-16 00:02:482014-03-16 00:06:53DROPPED 98765156164894949494
ec59cea2-5006-448f-a035-d5e53f33be232014-03-16 00:02:482014-03-16 00:06:12FAILED  54545454546469496477
ec59cea2-5006-448f-a036-d5e53f33be232014-03-16 00:02:482014-03-16 00:06:05SUCCESS 12354678902153698431
ec59cea2-5006-448f-a037-d5e53f33be232014-03-17 00:02:482014-03-17 00:06:05DROPPED 80556456458478787877
ec59cea2-5006-448f-a038-d5e53f33be232014-03-17 00:02:482014-03-17 00:06:05DROPPED 80809056095676236992
ec59cea2-5006-448f-a039-d5e53f33be232014-03-17 00:02:482014-03-17 00:06:05FAILED  44554584848449644469
ec59cea2-5006-448f-a040-d5e53f33be232014-03-18 00:02:482014-03-18 00:06:05DROPPED 96090652158087080806
ec59cea2-5006-448f-a041-d5e53f33be232014-03-18 00:02:482014-03-18 00:06:05DROPPED 89797946465879874615
ec59cea2-5006-448f-a042-d5e53f33be232014-03-18 00:02:482014-03-18 00:06:05SUCCESS 45454545457978978979
ec59cea2-5006-448f-a043-d5e53f33be232014-03-18 00:02:482014-03-18 00:06:05DROPPED 74584564564564564656
ec59cea2-5006-448f-a044-d5e53f33be232014-03-19 00:02:482014-03-19 00:06:05DROPPED 98794894945648947898
ec59cea2-5006-448f-a045-d5e53f33be232014-03-19 00:02:482014-03-19 00:06:05SUCCESS 84645645605646064646
ec59cea2-5006-448f-a046-d5e53f33be232014-03-19 00:02:482014-03-19 00:06:05DROPPED 78545456456456456456
ec59cea2-5006-448f-a047-d5e53f33be232014-03-19 00:02:482014-03-19 00:06:05DROPPED 57554548979797979797
ec59cea2-5006-448f-a048-d5e53f33be232014-03-20 00:02:482014-03-20 00:06:05SUCCESS 87898640989489089409
ec59cea2-5006-448f-a049-d5e53f33be232014-03-20 00:02:482014-03-20 00:06:05SUCCESS 75884848478978978979
ec59cea2-5006-448f-a050-d5e53f33be232014-03-20 00:02:482014-03-20 00:06:05SUCCESS 74894086489489489489
ec59cea2-5006-448f-a031-d5e53f33be232014-03-15 00:02:482014-03-15 00:06:05DROPPED 80526900577757919463
ec59cea2-5006-448f-a032-d5e53f33be232014-03-15 00:02:482014-03-15 00:06:07DROPPED 98861773759916790556
ec59cea2-5006-448f-a033-d5e53f33be232014-03-16 00:02:482014-03-16 00:06:45SUCCESS 86186279969886177375
ec59cea2-5006-448f-a034-d5e53f33be232014-03-16 00:02:482014-03-16 00:06:53DROPPED 98765156164894949494
ec59cea2-5006-448f-a035-d5e53f33be232014-03-16 00:02:482014-03-16 00:06:12FAILED  54545454546469496477
ec59cea2-5006-448f-a036-d5e53f33be232014-03-16 00:02:482014-03-16 00:06:05SUCCESS 12354678902153698431
ec59cea2-5006-448f-a037-d5e53f33be232014-03-17 00:02:482014-03-17 00:06:05DROPPED 80556456458478787877
ec59cea2-5006-448f-a038-d5e53f33be232014-03-17 00:02:482014-03-17 00:06:05DROPPED 80809056095676236992
ec59cea2-5006-448f-a039-d5e53f33be232014-03-17 00:02:482014-03-17 00:06:05FAILED  44554584848449644469
ec59cea2-5006-448f-a040-d5e53f33be232014-03-18 00:02:482014-03-18 00:06:05DROPPED 96090652158087080806
ec59cea2-5006-448f-a041-d5e53f33be232014-03-18 00:02:482014-03-18 00:06:05DROPPED 89797946465879874615
ec59cea2-5006-448f-a042-d5e53f33be232014-03-18 00:02:482014-03-18 00:06:05SUCCESS 45454545457978978979
ec59cea2-5006-448f-a043-d5e53f33be232014-03-18 00:02:482014-03-18 00:06:05DROPPED 74584564564564564656
ec59cea2-5006-448f-a044-d5e53f33be232014-03-19 00:02:482014-03-19 00:06:05DROPPED 98794894945648947898
ec59cea2-5006-448f-a045-d5e53f33be232014-03-19 00:02:482014-03-19 00:06:05SUCCESS 84645645605646064646
ec59cea2-5006-448f-a046-d5e53f33be232014-03-19 00:02:482014-03-19 00:06:05DROPPED 78545456456456456456
ec59cea2-5006-448f-a047-d5e53f33be232014-03-19 00:02:482014-03-19 00:06:05DROPPED 57554548979797979797
ec59cea2-5006-448f-a048-d5e53f33be232014-03-20 00:02:482014-03-20 00:06:05SUCCESS 87898640989489089409
ec59cea2-5006-448f-a049-d5e53f33be232014-03-20 00:02:482014-03-20 00:06:05SUCCESS 75884848478978978979
ec59cea2-5006-448f-a050-d5e53f33be232014-03-20 00:02:482014-03-20 00:06:05SUCCESS 74894086489489489489

start zookeeper: 
hadoop@hadoop:/usr/local/kafka$ bin/zookeeper-server-start.sh config/zookeeper.properties

start kafka server:
hadoop@hadoop:/usr/local/kafka$ bin/kafka-server-start.sh config/server.properties


bin/kafka-topics.sh --create --topic SUCCESS_RECORDS --partitions 1 --replication-factor 1 --zookeeper localhost:2181
bin/kafka-topics.sh --create --topic FAILED_RECORDS --partitions 1 --replication-factor 1 --zookeeper localhost:2181
bin/kafka-topics.sh --create --topic DROPPED_RECORDS --partitions 1 --replication-factor 1 --zookeeper localhost:2181


hadoop@hadoop:/usr/local/kafka$ bin/kafka-topics.sh --list --zookeeper localhost:2181
DROPPED_RECORDS
FAILED_RECORDS
SUCCESS_RECORDS


build.properties:
-----------------
sbt.version = 1.2.8

build.sbt dependency packages:
--------------------------------
name := "myOwn"

version := "0.1"

scalaVersion := "2.11.12"
// https://mvnrepository.com/artifact/org.apache.spark/spark-core
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-sql
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-twitter
libraryDependencies += "org.apache.spark" %% "spark-streaming-twitter" % "1.6.3"
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % "1.6.3"
// https://mvnrepository.com/artifact/org.apache.kafka/kafka
libraryDependencies += "org.apache.kafka" %% "kafka" % "2.1.1"



import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig,ProducerRecord}

object KafkaProducerCallLog {
  def main(args:Array[String]):Unit = {
    val props = new Properties()
    val topic = "KafkaTopic"
    props.put("bootstrap.servers","localhost:9092")
    props.put("client.id","ProducerApp")
    props.put("batch.size","32768")
    props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer")

    val topicSUCCESS = "SUCCESS_RECORDS"
    val topicFAILED = "FAILED_RECORDS"
    val topicDROPPED = "DROPPED_RECORDS"

    val producer = new KafkaProducer[String,String](props)

    val file = scala.io.Source.fromFile("/home/hadoop/Desktop/vow/calllogdata.txt")

    for (line <- file.getLines()) {
      val status_pattern = "(SUCCESS|FAILED|DROPPED)".r
      val status = status_pattern.findFirstIn(line).get
      if (status == "SUCCESS") {
        val msg = new ProducerRecord[String,String](topicSUCCESS,line)
        producer.send(msg)
        println(msg)
      }
      else if (status == "FAILED") {
        val msg = new ProducerRecord[String,String](topicFAILED,line)
        producer.send(msg)
        println(msg)
      }
      else
      {
        val msg = new ProducerRecord[String,String](topicDROPPED,line)
        producer.send(msg)
        println(msg)
      }
    }
    producer.close()
  }
}




 
bin/kafka-console-consumer.sh --topic SUCCESS_RECORDS --bootstrap-server localhost:9092 --from-beginning

bin/kafka-console-consumer.sh --topic FAILED_RECORDS --bootstrap-server localhost:9092 --from-beginning
ec59cea2-5006-448f-a035-d5e53f33be232014-03-16 00:02:482014-03-16 00:06:12FAILED  54545454546469496477
ec59cea2-5006-448f-a039-d5e53f33be232014-03-17 00:02:482014-03-17 00:06:05FAILED  44554584848449644469
ec59cea2-5006-448f-a035-d5e53f33be232014-03-16 00:02:482014-03-16 00:06:12FAILED  54545454546469496477
ec59cea2-5006-448f-a039-d5e53f33be232014-03-17 00:02:482014-03-17 00:06:05FAILED  44554584848449644469
ec59cea2-5006-448f-a035-d5e53f33be232014-03-16 00:02:482014-03-16 00:06:12FAILED  54545454546469496477
ec59cea2-5006-448f-a039-d5e53f33be232014-03-17 00:02:482014-03-17 00:06:05FAILED  44554584848449644469


bin/kafka-console-consumer.sh --topic DROPPED_RECORDS --bootstrap-server localhost:9092 --from-beginning


import java.util.{Collections, Properties}
import org.apache.kafka.clients.consumer.KafkaConsumer

import scala.collection.JavaConversions._

object KafkaConsumerExa1 {
  def main(args: Array[String]): Unit = {
    val properties = new Properties()
    properties.put("bootstrap.servers", "127.0.0.1:9092")
    properties.put("group.id", "testGroup1")

    properties.put("client.id", "ConsumerApp")
    // properties.put("partition.assignment.strategy", "range");



    properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    val consumer = new KafkaConsumer[String, String](properties)
    val topic = "FAILED_RECORDS"


    consumer.subscribe(Collections.singletonList(topic))

    System.out.println("Subscribed to topic " + topic)
 
   var records = consumer.poll(4000)
     consumer.seekToBeginning(consumer.assignment)
    records = consumer.poll(4000)
      for (record <- records.iterator()){
        println("Received Message : "  + record)
      }
 
    consumer.commitSync()
  }
}

No comments:

Post a Comment

Python Challenges Program

Challenges program: program 1: #Input :ABAABBCA #Output: A4B3C1 str1="ABAABBCA" str2="" d={} for x in str1: d[x]=d...