Wordcount program using Spark Streaming
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.log4j.Logger
import org.apache.log4j.Level
// we are going to reconfigure SparkConfiguration so, we stop existing SparkContext
sc.stop
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
val conf = new SparkConf().setMaster("local[4]").setAppName("workdcount")
val ssc = new StreamingContext(conf,Seconds(30))
val data = ssc.socketTextStream("localhost",7890)
val wc = data.flatMap(_.split(" ")).map (x => (x,1)).reduceByKey(_+_)
wc.print()
wc.saveAsTextFiles("hdfs://localhost:9000/user/streamingexa/ex")
ssc.start()
// start netcat and type something there
hadoop@hadoop:/$ nc -lk 7890
i love all beautiful things
god i love all beautiful
god i love
god i
// result is printed on console
-------------------------------------------
Time: 1549431720000 ms
-------------------------------------------
(god,3)
(i,4)
(all,2)
(beautiful,2)
(things,1)
(love,3)
-------------------------------------------
// result is written in hdfs
hadoop@hadoop:/$ hdfs dfs -ls hdfs://localhost:9000/user/streamingexa/
Found 1 items
drwxr-xr-x - hadoop supergroup 0 2019-02-06 11:12 hdfs://localhost:9000/user/streamingexa/ex-1549431720000
hadoop@hadoop:/$ hdfs dfs -ls hdfs://localhost:9000/user/streamingexa/ex-1549431720000
Found 5 items
-rw-r--r-- 3 hadoop supergroup 0 2019-02-06 11:12 hdfs://localhost:9000/user/streamingexa/ex-1549431720000/_SUCCESS
-rw-r--r-- 3 hadoop supergroup 8 2019-02-06 11:12 hdfs://localhost:9000/user/streamingexa/ex-1549431720000/part-00000
-rw-r--r-- 3 hadoop supergroup 39 2019-02-06 11:12 hdfs://localhost:9000/user/streamingexa/ex-1549431720000/part-00001
-rw-r--r-- 3 hadoop supergroup 9 2019-02-06 11:12 hdfs://localhost:9000/user/streamingexa/ex-1549431720000/part-00002
-rw-r--r-- 3 hadoop supergroup 0 2019-02-06 11:12 hdfs://localhost:9000/user/streamingexa/ex-1549431720000/part-00003
hadoop@hadoop:/$ hdfs dfs -cat hdfs://localhost:9000/user/streamingexa/ex-1549431720000/part-00001
(i,4)
(all,2)
(beautiful,2)
(things,1)
One more approach without stopping SparkContext:
---------------------------------------------------------------
hadoop@hadoop:~$ spark-shell --master local[2]
import org.apache.spark.streaming._
val ssc = new StreamingContext(sc,Seconds(15))
val d1 = ssc.socketTextStream("localhost",1234)
val d2 = d1.flatMap(x => x.split(" ")).map(x => (x,1)).reduceByKey( (x,y) => (x+y))
d2.print()
sc.setLogLevel("INFO")
ssc.start()
hadoop@hadoop:~$ nc -lk 1234
hadoop is batch processing
Time: 1549458165000 ms
-------------------------------------------
(is,1)
(batch,1)
(hadoop,1)
(processing,1)
import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming.{Seconds,StreamingContext}
import org.apache.spark.{SparkConf,SparkContext}
import org.apache.spark.sql.functions._
object Streaming1{
def main(args:Array[String]):Unit = {
// Create a configuration object
val conf = new SparkConf()
conf.set("spark.master","local[2]")
conf.set("spark.app.name","streamingApp1")
conf.set("spark.streaming.blockInterval","1000ms")
//creation of spark context object
val sc = new SparkContext(conf)
//creation of sqlContext
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
// Creation of Streaming Context
val ssc = new StreamingContext(sc,Seconds(2))
//Receiver Port
val d1 = ssc.socketTextStream("localhost",3456)
// DStream (sequence of RDDs) into Dataframe
d1.foreachRDD { x =>
val ds1 = x.toDS()
val ds2 = ds1.flatMap(x => x.split(" "))
val ds3 = ds2.map(x => (x,1))
ds3.groupBy("_1").agg(count("_2")).show
}
ssc.start()
ssc.awaitTermination()
}
}
hadoop@hadoop:~/Desktop/vow$ nc -lk 3456
god hath no better prize
love is love
love is life
life is love
//start spark shell
$ spark-shell --master local[2]
//stop existing SparkContext
scala> sc.stop
//load spark (scala) file
scala> :load streamingDF.sc
// call the main method
scala> Streaming1.main(null)
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.log4j.Logger
import org.apache.log4j.Level
// we are going to reconfigure SparkConfiguration so, we stop existing SparkContext
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
val conf = new SparkConf().setMaster("local[4]").setAppName("workdcount")
val ssc = new StreamingContext(conf,Seconds(30))
val data = ssc.socketTextStream("localhost",7890)
val wc = data.flatMap(_.split(" ")).map (x => (x,1)).reduceByKey(_+_)
wc.print()
wc.saveAsTextFiles("hdfs://localhost:9000/user/streamingexa/ex")
ssc.start()
// start netcat and type something there
hadoop@hadoop:/$ nc -lk 7890
i love all beautiful things
god i love all beautiful
god i love
god i
// result is printed on console
Time: 1549431720000 ms
-------------------------------------------
(god,3)
(i,4)
(all,2)
(beautiful,2)
(things,1)
(love,3)
-------------------------------------------
// result is written in hdfs
hadoop@hadoop:/$ hdfs dfs -ls hdfs://localhost:9000/user/streamingexa/
Found 1 items
drwxr-xr-x - hadoop supergroup 0 2019-02-06 11:12 hdfs://localhost:9000/user/streamingexa/ex-1549431720000
hadoop@hadoop:/$ hdfs dfs -ls hdfs://localhost:9000/user/streamingexa/ex-1549431720000
Found 5 items
-rw-r--r-- 3 hadoop supergroup 0 2019-02-06 11:12 hdfs://localhost:9000/user/streamingexa/ex-1549431720000/_SUCCESS
-rw-r--r-- 3 hadoop supergroup 8 2019-02-06 11:12 hdfs://localhost:9000/user/streamingexa/ex-1549431720000/part-00000
-rw-r--r-- 3 hadoop supergroup 39 2019-02-06 11:12 hdfs://localhost:9000/user/streamingexa/ex-1549431720000/part-00001
-rw-r--r-- 3 hadoop supergroup 9 2019-02-06 11:12 hdfs://localhost:9000/user/streamingexa/ex-1549431720000/part-00002
-rw-r--r-- 3 hadoop supergroup 0 2019-02-06 11:12 hdfs://localhost:9000/user/streamingexa/ex-1549431720000/part-00003
hadoop@hadoop:/$ hdfs dfs -cat hdfs://localhost:9000/user/streamingexa/ex-1549431720000/part-00001
(i,4)
(all,2)
(beautiful,2)
(things,1)
One more approach without stopping SparkContext:
---------------------------------------------------------------
hadoop@hadoop:~$ spark-shell --master local[2]
import org.apache.spark.streaming._
val ssc = new StreamingContext(sc,Seconds(15))
val d1 = ssc.socketTextStream("localhost",1234)
val d2 = d1.flatMap(x => x.split(" ")).map(x => (x,1)).reduceByKey( (x,y) => (x+y))
d2.print()
sc.setLogLevel("INFO")
ssc.start()
hadoop@hadoop:~$ nc -lk 1234
hadoop is batch processing
Time: 1549458165000 ms
-------------------------------------------
(is,1)
(batch,1)
(hadoop,1)
(processing,1)
import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming.{Seconds,StreamingContext}
import org.apache.spark.{SparkConf,SparkContext}
import org.apache.spark.sql.functions._
object Streaming1{
def main(args:Array[String]):Unit = {
// Create a configuration object
val conf = new SparkConf()
conf.set("spark.master","local[2]")
conf.set("spark.app.name","streamingApp1")
conf.set("spark.streaming.blockInterval","1000ms")
//creation of spark context object
val sc = new SparkContext(conf)
//creation of sqlContext
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
// Creation of Streaming Context
val ssc = new StreamingContext(sc,Seconds(2))
//Receiver Port
val d1 = ssc.socketTextStream("localhost",3456)
// DStream (sequence of RDDs) into Dataframe
d1.foreachRDD { x =>
val ds1 = x.toDS()
val ds2 = ds1.flatMap(x => x.split(" "))
val ds3 = ds2.map(x => (x,1))
ds3.groupBy("_1").agg(count("_2")).show
}
ssc.start()
ssc.awaitTermination()
}
}
// start netcat and open port 3456
god hath no better prize
love is love
love is life
life is love
//start spark shell
$ spark-shell --master local[2]
//stop existing SparkContext
scala> sc.stop
//load spark (scala) file
scala> :load streamingDF.sc
// call the main method
scala> Streaming1.main(null)
// result
+------+---------+
| _1|count(_2)|
+------+---------+
| love| 4|
| life| 2|
| is| 3|
| hath| 1|
| god| 1|
|better| 1|
| no| 1|
| prize| 1|
+------+---------+
No comments:
Post a Comment