Saturday, March 16, 2019

KAFKA and SPARK Streaming Word count Operation Flyover

NOTE: IT IS WORKING IN CLOUDERA 5.13


SriRam:
----------

$ spark-shell --master local[2]

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._

val ssc = new StreamingContext(sc,Seconds(10))
val lines = ssc.socketTextStream("localhost",9999)
val words = lines.flatMap(x => x.split(" "))
val pair = words.map(x => (x,1))
val res = pair.reduceByKey(_+_)
res.print()
ssc.start()


hadoop@hadoop:~$ nc -lk 9999
i love india
who loves pakistan
^C

Time: 1552710680000 ms
-------------------------------------------
(love,1)
(who,1)
(india,1)
(pakistan,1)
(i,1)
(loves,1)

Start the code  from here







start zookeeper:

hadoop@hadoop:/usr/local/kafka$ bin/zookeeper-server-start.sh config/zookeeper.properties 
start kafka server:
hadoop@hadoop:/usr/local/kafka$ bin/kafka-server-start.sh config/server.properties 


start spark-shell
$ spark-shell --master local[2]
hadoop@hadoop:/usr/local/kafka$ bin/kafka-topics.sh --create --topic myFresh --partitions 1 --replication-factor 1 --zookeeper localhost:2181
Created topic "myFresh".





hadoop@hadoop:/usr/local/kafka$ bin/kafka-topics.sh --list --zookeeper localhost:2181

myFresh

hadoop@hadoop:/usr/local/kafka$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic myFresh
>i love india
>i love singapore
>i love malaysia

hadoop@hadoop:/usr/local/kafka$ bin/kafka-console-consumer.sh --topic myFresh --bootstrap-server localhost:9092 --from-beginning
i love india
i love singapore
i love Malaysia

 spark-shell

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._

import org.apache.spark.streaming.kafka.KafkaUtils

import org.apache.log4j.Logger

import org.apache.log4j.Level



import org.apache.spark._
import org.apache.spark.streaming._


import org.apache.spark.streaming.kafka.KafkaUtils

import org.apache.log4j.Logger

import org.apache.log4j.Level



object SriRam {
def main (args : Array[String]) : Unit ={

  Logger.getLogger("org").setLevel(Level.OFF)

  Logger.getLogger("akka").setLevel(Level.OFF)

  val conf = new SparkConf().setAppName("SparkStreamingJson").setMaster("local[4]")

  val sc = new SparkContext(conf)

  val ssc = new StreamingContext(sc,Seconds(10))
  val kafkaStream = KafkaUtils.createStream(ssc,"localhost:2181","testGroup",Map("myFresh" -> 0))
  val lines = kafkaStream.map(x => x._2.toUpperCase)
  print(lines)
  val words = lines.flatMap(x => x.split(" "))
   
  val pair = words.map(x => (x,1))

  val res = pair.reduceByKey(_+_)
  print("------------started--------")
  res.print()
  print(res)
  res.foreachRDD( rdd =>
      rdd.foreachPartition( partition =>
        partition.foreach{
            case (w:String, cnt:Int) => {
              val x = w + "\t" + cnt
              print(x)
  print("------------")
            }
          }
      )
  )
  ssc.start()
  ssc.awaitTermination()

}
}




No comments:

Post a Comment

Python Challenges Program

Challenges program: program 1: #Input :ABAABBCA #Output: A4B3C1 str1="ABAABBCA" str2="" d={} for x in str1: d[x]=d...