NOTE: IT IS WORKING IN CLOUDERA 5.13
SriRam:
SriRam:
----------
$ spark-shell --master local[2]
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming. StreamingContext._
val ssc = new StreamingContext(sc,Seconds( 10))
val lines = ssc.socketTextStream(" localhost",9999)
val words = lines.flatMap(x => x.split(" "))
val pair = words.map(x => (x,1))
val res = pair.reduceByKey(_+_)
res.print()
ssc.start()
hadoop@hadoop:~$ nc -lk 9999
i love india
who loves pakistan
^C
Time: 1552710680000 ms
------------------------------ -------------
(love,1)
(who,1)
(india,1)
(pakistan,1)
(i,1)
(loves,1)
Start the code from here
start zookeeper:
hadoop@hadoop:/usr/local/ kafka$ bin/zookeeper-server-start.sh config/zookeeper.properties
start kafka server:
hadoop@hadoop:/usr/local/ kafka$ bin/kafka-server-start.sh config/server.properties
start spark-shell
$ spark-shell --master local[2]
hadoop@hadoop:/usr/local/ kafka$ bin/kafka-topics.sh --create --topic myFresh --partitions 1 --replication-factor 1 --zookeeper localhost:2181
Created topic "myFresh".
hadoop@hadoop:/usr/local/ kafka$ bin/kafka-topics.sh --list --zookeeper localhost:2181
myFresh
hadoop@hadoop:/usr/local/ kafka$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic myFresh
>i love india
>i love singapore
>i love malaysia
hadoop@hadoop:/usr/local/ kafka$ bin/kafka-console-consumer.sh --topic myFresh --bootstrap-server localhost:9092 --from-beginning
i love india
i love singapore
i love Malaysia
spark-shell
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming. StreamingContext._
import org.apache.spark.streaming. kafka.KafkaUtils
import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming. kafka.KafkaUtils
import org.apache.log4j.Logger
import org.apache.log4j.Level
object SriRam {
def main (args : Array[String]) : Unit ={
Logger.getLogger("org"). setLevel(Level.OFF)
Logger.getLogger("akka"). setLevel(Level.OFF)
val conf = new SparkConf().setAppName(" SparkStreamingJson"). setMaster("local[4]")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc,Seconds( 10))
val kafkaStream = KafkaUtils.createStream(ssc," localhost:2181","testGroup", Map("myFresh" -> 0))
val lines = kafkaStream.map(x => x._2.toUpperCase)
print(lines)
val words = lines.flatMap(x => x.split(" "))
val pair = words.map(x => (x,1))
val res = pair.reduceByKey(_+_)
print("------------started---- ----")
res.print()
print(res)
res.foreachRDD( rdd =>
rdd.foreachPartition( partition =>
partition.foreach{
case (w:String, cnt:Int) => {
val x = w + "\t" + cnt
print(x)
print("------------")
}
}
)
)
ssc.start()
ssc.awaitTermination()
}
}
No comments:
Post a Comment