Producer and Consumer Programming in Spark Streaming
Kafka_Producer.sc:
------------------
package Kafka_Stream
import java.util.Properties
import scala.util.Random
import org.apache.kafka.clients.producer.{KafkaProducer,ProducerRecord}
object Kafka_Producer {
def main(args: Array[String]): Unit = {
val brokers = "localhost:9092"
//change the topic name if required
val topic = "kafka-sparkstream-test"
val messagesPerSec = 1
val wordsPerMessage = 15
val random = new Random()
val names = Seq("Hi", "Hello", "How", "are", "you", "test", "yes", "no", "fine", "great", "ok", "not", "that")
//ZooKeeper connection properties
val props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](props)
//send some messages
while (true) {
(1 to messagesPerSec.toInt).foreach { messageNum =>
val str = (1 to wordsPerMessage.toInt).map(x => random.shuffle(names).head).mkString(" ")
println(str)
val message = new ProducerRecord[String, String](topic, null, str)
producer.send(message)
}
Thread.sleep(1000)
}
}
}
build.properties:
-----------------
sbt.version = 1.2.8
build.sbt dependency packages:
--------------------------------
name := "TwitInt"
version := "0.1"
scalaVersion := "2.11.12"
// https://mvnrepository.com/artifact/org.apache.spark/spark-core
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-sql
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-twitter
libraryDependencies += "org.apache.spark" %% "spark-streaming-twitter" % "1.6.3"
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % "1.6.3"
Kafka_Stream.sc:
---------------
package Kafka_Stream
import org.apache.spark._
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.log4j.Logger
import org.apache.log4j.Level
object Kafka_Stream{
def main(args:Array[String]):Unit={
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
val conf = new SparkConf()
conf.set("spark.master","local[3]")
conf.set("spark.app.name","KafkaReceiver")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc,Seconds(20))
val KafkaStream = KafkaUtils.createStream(ssc,"localhost:2181","spark-streaming-consumer-group",Map("kafka-sparkstream-test" -> 5))
val words = KafkaStream.flatMap(x => x._2.split(" "))
val wordCounts = words.map(x => (x,1)).reduceByKey(_+_)
wordCounts.print() // prints the wordcount result of the streaming
ssc.start()
ssc.awaitTermination()
}
}
Step 1:
//Make sure zookeeper is running:
hadoop@hadoop:/usr/local/kafka$ sh bin/zookeeper-server-start.sh config/zookeer.properties
Step 2:
//Start the kafka server:
hadoop@hadoop:/usr/local/kafka$ sh bin/kafka-server-start.sh config/server.properties
Step 3:
// create kafka topic
Created topic "kafka-sparkstream-test".
hadoop@hadoop:/usr/local/kafka/bin$ sh kafka-topics.sh --zookeeper localhost:2181 --list
kafka-sparkstream-test
hadoop@hadoop:/tmp/kafka-logs/kafka-sparkstream-test-0$ ls -l
total 20488
-rw-r--r-- 1 hadoop hadoop 10485760 Feb 17 20:52 00000000000000000000.index
-rw-r--r-- 1 hadoop hadoop 2109 Feb 17 20:56 00000000000000000000.log
-rw-r--r-- 1 hadoop hadoop 10485756 Feb 17 20:52 00000000000000000000.timeindex
-rw-r--r-- 1 hadoop hadoop 8 Feb 17 20:52 leader-epoch-checkpoint
[IJ]sbt:TwitInt> compile
sbt>> package
[IJ]sbt:TwitInt> show discoveredMainClasses
[info] * IntTwit
[info] * Kafka_Stream.Kafka_Producer
[info] * Kafka_Stream.Kafka_Stream
[success] Total time: 2 s, completed 17 Feb, 2019 9:29:23 PM
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.11</artifactId>
<version>1.6.3</version>
</dependency>
org.apache.spark:spark-streaming-kafka_2.11:1.6.3
$ spark-shell --packages org.apache.spark:spark-streaming-kafka_2.11:1.6.3
scala> :load Kafka_Producer.scala
ok you test are How you not you that ok great you are How How
How fine How not test yes fine not Hi great great great that fine great
that Hello ok fine yes you yes you not fine that are yes Hello that
not great test you fine are are no that ok not no Hello that test
test ok that no fine that that How not great ok ok ok not Hello
$ spark-shell --packages org.apache.spark:spark-streaming-kafka_2.11:1.6.3
Kafka_Stream.scala:1: error: illegal start of definition
package Kafka_Stream
^
import org.apache.spark._
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.log4j.Logger
import org.apache.log4j.Level
defined object Kafka_Stream
scala> Kafka_Stream.main(null)
// result of wordcount will be displayed here.
spark-submit --class Kafka_Stream.Kafka_Producer /home/hadoop/Desktop/vow/TwitInt/target/scala-2.11/twitint_2.11-0.1.jar
spark-submit --class Kafka_Stream.Kafka_Stream /home/hadoop/Desktop/vow/TwitInt/target/scala-2.11/twitint_2.11-0.1.jar
------------------
package Kafka_Stream
import java.util.Properties
import scala.util.Random
import org.apache.kafka.clients.producer.{KafkaProducer,ProducerRecord}
object Kafka_Producer {
def main(args: Array[String]): Unit = {
val brokers = "localhost:9092"
//change the topic name if required
val topic = "kafka-sparkstream-test"
val messagesPerSec = 1
val wordsPerMessage = 15
val random = new Random()
val names = Seq("Hi", "Hello", "How", "are", "you", "test", "yes", "no", "fine", "great", "ok", "not", "that")
//ZooKeeper connection properties
val props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](props)
//send some messages
while (true) {
(1 to messagesPerSec.toInt).foreach { messageNum =>
val str = (1 to wordsPerMessage.toInt).map(x => random.shuffle(names).head).mkString(" ")
println(str)
val message = new ProducerRecord[String, String](topic, null, str)
producer.send(message)
}
Thread.sleep(1000)
}
}
}
build.properties:
-----------------
sbt.version = 1.2.8
build.sbt dependency packages:
--------------------------------
name := "TwitInt"
version := "0.1"
scalaVersion := "2.11.12"
// https://mvnrepository.com/artifact/org.apache.spark/spark-core
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-sql
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-twitter
libraryDependencies += "org.apache.spark" %% "spark-streaming-twitter" % "1.6.3"
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % "1.6.3"
Kafka_Stream.sc:
---------------
package Kafka_Stream
import org.apache.spark._
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.log4j.Logger
import org.apache.log4j.Level
object Kafka_Stream{
def main(args:Array[String]):Unit={
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
val conf = new SparkConf()
conf.set("spark.master","local[3]")
conf.set("spark.app.name","KafkaReceiver")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc,Seconds(20))
val KafkaStream = KafkaUtils.createStream(ssc,"localhost:2181","spark-streaming-consumer-group",Map("kafka-sparkstream-test" -> 5))
//need to change the topic name and port number accordingly
val wordCounts = words.map(x => (x,1)).reduceByKey(_+_)
// KafkaStream.print() // prints the Stream of data received
ssc.start()
ssc.awaitTermination()
}
}
Step 1:
//Make sure zookeeper is running:
hadoop@hadoop:/usr/local/kafka$ sh bin/zookeeper-server-start.sh config/zookeer.properties
Step 2:
//Start the kafka server:
hadoop@hadoop:/usr/local/kafka$ sh bin/kafka-server-start.sh config/server.properties
Step 3:
// create kafka topic
hadoop@hadoop:/usr/local/kafka/bin$ sh kafka-topics.sh --create --topic kafka-sparkstream-test --zookeeper localhost:2181 --replication-factor 1 --partitions 3
// list the kafka topics to verify
kafka-sparkstream-test
hadoop@hadoop:/tmp/kafka-logs/kafka-sparkstream-test-0$ ls -l
total 20488
-rw-r--r-- 1 hadoop hadoop 10485760 Feb 17 20:52 00000000000000000000.index
-rw-r--r-- 1 hadoop hadoop 2109 Feb 17 20:56 00000000000000000000.log
-rw-r--r-- 1 hadoop hadoop 10485756 Feb 17 20:52 00000000000000000000.timeindex
-rw-r--r-- 1 hadoop hadoop 8 Feb 17 20:52 leader-epoch-checkpoint
[IJ]sbt:TwitInt> compile
sbt>> package
[IJ]sbt:TwitInt> show discoveredMainClasses
[info] * IntTwit
[info] * Kafka_Stream.Kafka_Producer
[info] * Kafka_Stream.Kafka_Stream
[success] Total time: 2 s, completed 17 Feb, 2019 9:29:23 PM
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.11</artifactId>
<version>1.6.3</version>
</dependency>
org.apache.spark:spark-streaming-kafka_2.11:1.6.3
//start an instance of spark shell
scala> :load Kafka_Producer.scala
scala> Kafka_Producer.main(null)
How fine How not test yes fine not Hi great great great that fine great
that Hello ok fine yes you yes you not fine that are yes Hello that
not great test you fine are are no that ok not no Hello that test
test ok that no fine that that How not great ok ok ok not Hello
//create new instance of spark-shell
scala> :load Kafka_Stream.scala
Loading Kafka_Stream.scala…
package Kafka_Stream
^
import org.apache.spark._
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.log4j.Logger
import org.apache.log4j.Level
defined object Kafka_Stream
scala> Kafka_Stream.main(null)
// result of wordcount will be displayed here.
spark-submit --class Kafka_Stream.Kafka_Producer /home/hadoop/Desktop/vow/TwitInt/target/scala-2.11/twitint_2.11-0.1.jar
spark-submit --class Kafka_Stream.Kafka_Stream /home/hadoop/Desktop/vow/TwitInt/target/scala-2.11/twitint_2.11-0.1.jar
No comments:
Post a Comment