Streaming with Windowing in Spark with Scala
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
object StreamingExa{
def main(args:Array[String]):Unit = {
val conf = new SparkConf()
conf.set("spark.master","local[2]")
conf.set("spark.app.name","windowing")
conf.set("spark.streaming.blockInterval","500ms")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc,Seconds(20))
val ds1 = ssc.socketTextStream("localhost",4444)
// Windowing applied here
val ds2 = ds1.window(Seconds(40))
ds2.print()
ssc.start()
ssc.awaitTermination()
}
}
hadoop@hadoop:~/Desktop/vow$ nc -lk 4444
1 1 1 1
2 2 2 2 2
3 3 3 3 3 3 -------- 1st batch input
4 4 4 4 4 4 4
5 5 5 5 5 5 5 5 -- 2nd batch input
a a a a
b b b b b -- 3rd batch input
c c c c
d d d -- 4th batch input
Output:
Batch : #1
Time: 1549958000000 ms
-------------------------------------------
1 1 1 1
2 2 2 2 2
3 3 3 3 3 3
Batch : #2
-------------------------------------------
Time: 1549958020000 ms
-------------------------------------------
1 1 1 1
2 2 2 2 2
3 3 3 3 3 3
4 4 4 4 4 4 4
5 5 5 5 5 5 5 5
Batch : #3
-------------------------------------------
Time: 1549958040000 ms
-------------------------------------------
4 4 4 4 4 4 4
5 5 5 5 5 5 5 5
a a a a
b b b b b
Batch : #4
-------------------------------------------
Time: 1549958060000 ms
-------------------------------------------
a a a a
b b b b b
c c c c
d d d
Batch : #5
-------------------------------------------
Time: 1549958080000 ms
-------------------------------------------
c c c c
d d d
import org.apache.spark.{SparkConf, SparkContext}
object StreamingExa{
def main(args:Array[String]):Unit = {
val conf = new SparkConf()
conf.set("spark.master","local[2]")
conf.set("spark.app.name","windowing")
conf.set("spark.streaming.blockInterval","500ms")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc,Seconds(20))
val ds1 = ssc.socketTextStream("localhost",4444)
// Windowing applied here
val ds2 = ds1.window(Seconds(40))
ds2.print()
ssc.start()
ssc.awaitTermination()
}
}
hadoop@hadoop:~/Desktop/vow$ nc -lk 4444
1 1 1 1
2 2 2 2 2
3 3 3 3 3 3 -------- 1st batch input
4 4 4 4 4 4 4
5 5 5 5 5 5 5 5 -- 2nd batch input
a a a a
b b b b b -- 3rd batch input
c c c c
d d d -- 4th batch input
Output:
Batch : #1
Time: 1549958000000 ms
-------------------------------------------
1 1 1 1
2 2 2 2 2
3 3 3 3 3 3
Batch : #2
-------------------------------------------
Time: 1549958020000 ms
-------------------------------------------
1 1 1 1
2 2 2 2 2
3 3 3 3 3 3
4 4 4 4 4 4 4
5 5 5 5 5 5 5 5
Batch : #3
-------------------------------------------
Time: 1549958040000 ms
-------------------------------------------
4 4 4 4 4 4 4
5 5 5 5 5 5 5 5
a a a a
b b b b b
Batch : #4
-------------------------------------------
Time: 1549958060000 ms
-------------------------------------------
a a a a
b b b b b
c c c c
d d d
Batch : #5
-------------------------------------------
Time: 1549958080000 ms
-------------------------------------------
c c c c
d d d
No comments:
Post a Comment