Kafka Producer in Scala Programming Example
start zookeeper:
hadoop@hadoop:/usr/local/kafka$ bin/zookeeper-server-start.sh config/zookeeper.properties
start kafka server:
hadoop@hadoop:/usr/local/kafka$ bin/kafka-server-start.sh config/server.properties
bin/kafka-topics.sh --create --topic KafkaTopic --partitions 1 --replication-factor 1 --zookeeper localhost:2181
build.properties:
-----------------
sbt.version = 1.2.8
build.sbt dependency packages:
--------------------------------
name := "TwitInt"
version := "0.1"
scalaVersion := "2.11.12"
// https://mvnrepository.com/artifact/org.apache.spark/spark-core
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-sql
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-twitter
libraryDependencies += "org.apache.spark" %% "spark-streaming-twitter" % "1.6.3"
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % "1.6.3"
import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig,ProducerRecord}
object KafkaProducerExa {
def main(args:Array[String]):Unit = {
val props = new Properties()
val topic = "KafkaTopic"
props.put("bootstrap.servers","localhost:9092")
props.put("client.id","ScalaProducer")
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String,String](props)
val msg:String = "Welcome to Kafka : #"
for (i <- 1 to 10){
val data = new ProducerRecord[String,String](topic,msg+i)
producer.send(data)
}
producer.close()
println("------Successfully published messages to topic : " + topic + "----")
}
}
Run the .scala file in IntelliJ IDEA
------Successfully published messages to topic : KafkaTopic----
See the output in console:
--------------------------
hadoop@hadoop:/usr/local/kafka$ bin/kafka-console-consumer.sh --topic KafkaTopic --bootstrap-server localhost:9092
Welcome to Kafka : #1
Welcome to Kafka : #2
Welcome to Kafka : #3
Welcome to Kafka : #4
Welcome to Kafka : #5
Welcome to Kafka : #6
Welcome to Kafka : #7
Welcome to Kafka : #8
Welcome to Kafka : #9
Welcome to Kafka : #10
hadoop@hadoop:/usr/local/kafka$ bin/zookeeper-server-start.sh config/zookeeper.properties
start kafka server:
hadoop@hadoop:/usr/local/kafka$ bin/kafka-server-start.sh config/server.properties
bin/kafka-topics.sh --create --topic KafkaTopic --partitions 1 --replication-factor 1 --zookeeper localhost:2181
build.properties:
-----------------
sbt.version = 1.2.8
build.sbt dependency packages:
--------------------------------
name := "TwitInt"
version := "0.1"
scalaVersion := "2.11.12"
// https://mvnrepository.com/artifact/org.apache.spark/spark-core
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-sql
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-twitter
libraryDependencies += "org.apache.spark" %% "spark-streaming-twitter" % "1.6.3"
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % "1.6.3"
import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig,ProducerRecord}
object KafkaProducerExa {
def main(args:Array[String]):Unit = {
val props = new Properties()
val topic = "KafkaTopic"
props.put("bootstrap.servers","localhost:9092")
props.put("client.id","ScalaProducer")
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String,String](props)
val msg:String = "Welcome to Kafka : #"
for (i <- 1 to 10){
val data = new ProducerRecord[String,String](topic,msg+i)
producer.send(data)
}
producer.close()
println("------Successfully published messages to topic : " + topic + "----")
}
}
Run the .scala file in IntelliJ IDEA
------Successfully published messages to topic : KafkaTopic----
See the output in console:
--------------------------
hadoop@hadoop:/usr/local/kafka$ bin/kafka-console-consumer.sh --topic KafkaTopic --bootstrap-server localhost:9092
Welcome to Kafka : #1
Welcome to Kafka : #2
Welcome to Kafka : #3
Welcome to Kafka : #4
Welcome to Kafka : #5
Welcome to Kafka : #6
Welcome to Kafka : #7
Welcome to Kafka : #8
Welcome to Kafka : #9
Welcome to Kafka : #10
Thanks for sharing the valuable information.
ReplyDeletePython Online Training