Tuesday, February 19, 2019

Producer and Consumer Programming in Spark Streaming

Producer and Consumer Programming in Spark Streaming



Kafka_Producer.sc:
------------------



package Kafka_Stream

import java.util.Properties
import scala.util.Random
import org.apache.kafka.clients.producer.{KafkaProducer,ProducerRecord}

object Kafka_Producer {
  def main(args: Array[String]): Unit = {
    val brokers = "localhost:9092"

    //change the topic name if required
    val topic = "kafka-sparkstream-test"
    val messagesPerSec = 1
    val wordsPerMessage = 15
    val random = new Random()
    val names = Seq("Hi", "Hello", "How", "are", "you", "test", "yes", "no", "fine", "great", "ok", "not", "that")

    //ZooKeeper connection properties
    val props = new Properties()
    props.put("bootstrap.servers", "localhost:9092")
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")

    val producer = new KafkaProducer[String, String](props)

    //send some messages

    while (true) {
      (1 to messagesPerSec.toInt).foreach { messageNum =>
        val str = (1 to wordsPerMessage.toInt).map(x => random.shuffle(names).head).mkString(" ")
        println(str)
        val message = new ProducerRecord[String, String](topic, null, str)
        producer.send(message)
      }
      Thread.sleep(1000)
    }
  }
}

build.properties:
-----------------
sbt.version = 1.2.8

build.sbt dependency packages:
--------------------------------
name := "TwitInt"

version := "0.1"

scalaVersion := "2.11.12"


// https://mvnrepository.com/artifact/org.apache.spark/spark-core
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-sql
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-twitter
libraryDependencies += "org.apache.spark" %% "spark-streaming-twitter" % "1.6.3"
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % "1.6.3"





Kafka_Stream.sc:
---------------

package Kafka_Stream
import org.apache.spark._
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.kafka.KafkaUtils

import org.apache.log4j.Logger
import org.apache.log4j.Level

object Kafka_Stream{
  def main(args:Array[String]):Unit={
    Logger.getLogger("org").setLevel(Level.OFF)
    Logger.getLogger("akka").setLevel(Level.OFF)

    val conf = new SparkConf()
    conf.set("spark.master","local[3]")
    conf.set("spark.app.name","KafkaReceiver")
    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc,Seconds(20))

    val KafkaStream = KafkaUtils.createStream(ssc,"localhost:2181","spark-streaming-consumer-group",Map("kafka-sparkstream-test" -> 5))

    //need to change the topic name and port number accordingly

    val words = KafkaStream.flatMap(x => x._2.split(" "))
    val wordCounts = words.map(x => (x,1)).reduceByKey(_+_)

   // KafkaStream.print() // prints the Stream of data received


    wordCounts.print() // prints the wordcount result of the streaming


    ssc.start()
    ssc.awaitTermination()
  }
}

Step 1:

//Make sure zookeeper is running:

hadoop@hadoop:/usr/local/kafka$ sh bin/zookeeper-server-start.sh config/zookeer.properties


Step 2:
//Start the kafka server:

hadoop@hadoop:/usr/local/kafka$  sh bin/kafka-server-start.sh config/server.properties



Step 3:
// create kafka topic


hadoop@hadoop:/usr/local/kafka/bin$ sh kafka-topics.sh --create --topic kafka-sparkstream-test --zookeeper localhost:2181 --replication-factor 1 --partitions 3

Created topic "kafka-sparkstream-test".

// list the kafka topics to verify

hadoop@hadoop:/usr/local/kafka/bin$ sh kafka-topics.sh --zookeeper localhost:2181 --list
kafka-sparkstream-test

hadoop@hadoop:/tmp/kafka-logs/kafka-sparkstream-test-0$ ls -l
total 20488
-rw-r--r-- 1 hadoop hadoop 10485760 Feb 17 20:52 00000000000000000000.index
-rw-r--r-- 1 hadoop hadoop     2109 Feb 17 20:56 00000000000000000000.log
-rw-r--r-- 1 hadoop hadoop 10485756 Feb 17 20:52 00000000000000000000.timeindex
-rw-r--r-- 1 hadoop hadoop        8 Feb 17 20:52 leader-epoch-checkpoint


[IJ]sbt:TwitInt> compile


sbt>> package

[IJ]sbt:TwitInt> show discoveredMainClasses
[info] * IntTwit
[info] * Kafka_Stream.Kafka_Producer
[info] * Kafka_Stream.Kafka_Stream
[success] Total time: 2 s, completed 17 Feb, 2019 9:29:23 PM


<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka -->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka_2.11</artifactId>
    <version>1.6.3</version>
</dependency>
org.apache.spark:spark-streaming-kafka_2.11:1.6.3



//start an  instance of spark shell

$ spark-shell --packages org.apache.spark:spark-streaming-kafka_2.11:1.6.3

scala> :load Kafka_Producer.scala

scala> Kafka_Producer.main(null)

ok you test are How you not you that ok great you are How How
How fine How not test yes fine not Hi great great great that fine great
that Hello ok fine yes you yes you not fine that are yes Hello that
not great test you fine are are no that ok not no Hello that test
test ok that no fine that that How not great ok ok ok not Hello


//create new instance of spark-shell

$ spark-shell --packages org.apache.spark:spark-streaming-kafka_2.11:1.6.3

scala> :load Kafka_Stream.scala

Loading Kafka_Stream.scala…

Kafka_Stream.scala:1: error: illegal start of definition
package Kafka_Stream
^
import org.apache.spark._
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.log4j.Logger
import org.apache.log4j.Level
defined object Kafka_Stream

scala> Kafka_Stream.main(null)

// result of wordcount will be displayed here.

spark-submit --class Kafka_Stream.Kafka_Producer /home/hadoop/Desktop/vow/TwitInt/target/scala-2.11/twitint_2.11-0.1.jar

spark-submit --class Kafka_Stream.Kafka_Stream /home/hadoop/Desktop/vow/TwitInt/target/scala-2.11/twitint_2.11-0.1.jar

No comments:

Post a Comment

Python Challenges Program

Challenges program: program 1: #Input :ABAABBCA #Output: A4B3C1 str1="ABAABBCA" str2="" d={} for x in str1: d[x]=d...