CheckPoint in Spark Streaming
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.log4j.Logger
import org.apache.log4j.Level
object CheckPointExa{
def main(args:Array[String]):Unit={
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
val conf = new SparkConf()
conf.set("spark.master","local[2]")
conf.set("spark.app.name","CheckPointTesting")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc,Seconds(20))
// hdfs folder location to do CheckPoint
ssc.checkpoint("hdfs://localhost:9000/user/myCheckPointFolder")
val ds1 = ssc.socketTextStream("localhost",34567)
val ds2 = ds1.map(x => x + " Stream ")
val ds3 = ds2.map(x => x + " Streaming ")
ds3.print()
//the data of ds3 gets CheckPointed
ds3.checkpoint(Seconds(20))
ssc.start()
ssc.awaitTermination()
}
}
// Input from Netcat
hadoop@hadoop:~$ nc -lk 34567
i love you
madham
// result displayed on console
-------------------------------------------
Time: 1550145480000 ms
-------------------------------------------
i love you Stream Streaming
madham Stream Streaming
hadoop@hadoop:~$ hdfs dfs -ls /user/myCheckPointFolder
Found 7 items
drwxr-xr-x - hadoop supergroup 0 2019-02-14 17:28 /user/myCheckPointFolder/b75036e7-c6c3-4315-961d-0784509938c2
drwxr-xr-x - hadoop supergroup 0 2019-02-14 17:25 /user/myCheckPointFolder/c5966d03-245f-471a-a3e0-a9a9bd01c330
-rw-r--r-- 3 hadoop supergroup 2967 2019-02-14 17:28 /user/myCheckPointFolder/checkpoint-1550145480000
-rw-r--r-- 3 hadoop supergroup 2895 2019-02-14 17:28 /user/myCheckPointFolder/checkpoint-1550145480000.bk
-rw-r--r-- 3 hadoop supergroup 2977 2019-02-14 17:28 /user/myCheckPointFolder/checkpoint-1550145500000
-rw-r--r-- 3 hadoop supergroup 2972 2019-02-14 17:28 /user/myCheckPointFolder/checkpoint-1550145500000.bk
drwxr-xr-x - hadoop supergroup 0 2019-02-14 17:27 /user/myCheckPointFolder/receivedBlockMetadata
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.log4j.Logger
import org.apache.log4j.Level
object CheckPointExa{
def main(args:Array[String]):Unit={
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
val conf = new SparkConf()
conf.set("spark.master","local[2]")
conf.set("spark.app.name","CheckPointTesting")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc,Seconds(20))
// hdfs folder location to do CheckPoint
ssc.checkpoint("hdfs://localhost:9000/user/myCheckPointFolder")
val ds1 = ssc.socketTextStream("localhost",34567)
val ds2 = ds1.map(x => x + " Stream ")
val ds3 = ds2.map(x => x + " Streaming ")
ds3.print()
//the data of ds3 gets CheckPointed
ds3.checkpoint(Seconds(20))
ssc.start()
ssc.awaitTermination()
}
}
// Input from Netcat
hadoop@hadoop:~$ nc -lk 34567
i love you
madham
// result displayed on console
-------------------------------------------
Time: 1550145480000 ms
-------------------------------------------
i love you Stream Streaming
madham Stream Streaming
// checkpoint folder created after running the program
Found 7 items
drwxr-xr-x - hadoop supergroup 0 2019-02-14 17:28 /user/myCheckPointFolder/b75036e7-c6c3-4315-961d-0784509938c2
drwxr-xr-x - hadoop supergroup 0 2019-02-14 17:25 /user/myCheckPointFolder/c5966d03-245f-471a-a3e0-a9a9bd01c330
-rw-r--r-- 3 hadoop supergroup 2967 2019-02-14 17:28 /user/myCheckPointFolder/checkpoint-1550145480000
-rw-r--r-- 3 hadoop supergroup 2895 2019-02-14 17:28 /user/myCheckPointFolder/checkpoint-1550145480000.bk
-rw-r--r-- 3 hadoop supergroup 2977 2019-02-14 17:28 /user/myCheckPointFolder/checkpoint-1550145500000
-rw-r--r-- 3 hadoop supergroup 2972 2019-02-14 17:28 /user/myCheckPointFolder/checkpoint-1550145500000.bk
drwxr-xr-x - hadoop supergroup 0 2019-02-14 17:27 /user/myCheckPointFolder/receivedBlockMetadata
No comments:
Post a Comment