Tuesday, March 12, 2019

Kafka Producer in Scala Programming Example

Kafka Producer in Scala Programming Example



start zookeeper: 
hadoop@hadoop:/usr/local/kafka$ bin/zookeeper-server-start.sh config/zookeeper.properties

start kafka server:
hadoop@hadoop:/usr/local/kafka$ bin/kafka-server-start.sh config/server.properties

bin/kafka-topics.sh --create --topic KafkaTopic --partitions 1 --replication-factor 1 --zookeeper localhost:2181


build.properties:
-----------------
sbt.version = 1.2.8

build.sbt dependency packages:
--------------------------------
name := "TwitInt"

version := "0.1"

scalaVersion := "2.11.12"

// https://mvnrepository.com/artifact/org.apache.spark/spark-core
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-sql
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-twitter
libraryDependencies += "org.apache.spark" %% "spark-streaming-twitter" % "1.6.3"
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % "1.6.3"


import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig,ProducerRecord}

object KafkaProducerExa {
  def main(args:Array[String]):Unit = {
    val props = new Properties()
    val topic = "KafkaTopic"
    props.put("bootstrap.servers","localhost:9092")
    props.put("client.id","ScalaProducer")
    props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer")

    val producer = new KafkaProducer[String,String](props)
    val msg:String = "Welcome to Kafka : #"
    for (i <- 1 to 10){
      val data = new ProducerRecord[String,String](topic,msg+i)
      producer.send(data)
    }
    producer.close()
    println("------Successfully published messages to topic : " + topic + "----")
  }
}


Run the .scala file in IntelliJ IDEA
------Successfully published messages to topic : KafkaTopic----


See the output in console:
--------------------------

hadoop@hadoop:/usr/local/kafka$ bin/kafka-console-consumer.sh --topic KafkaTopic --bootstrap-server localhost:9092

Welcome to Kafka : #1
Welcome to Kafka : #2
Welcome to Kafka : #3
Welcome to Kafka : #4
Welcome to Kafka : #5
Welcome to Kafka : #6
Welcome to Kafka : #7
Welcome to Kafka : #8
Welcome to Kafka : #9
Welcome to Kafka : #10

 

1 comment:

Python Challenges Program

Challenges program: program 1: #Input :ABAABBCA #Output: A4B3C1 str1="ABAABBCA" str2="" d={} for x in str1: d[x]=d...