// Here is the sample program which supports CheckPoint Recovery in Spark Streaming
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.log4j.Logger
import org.apache.log4j.Level
object RecoveryCheckPoint{
def main(args:Array[String]): Unit={
Logger.getLogger("org").setLevel(Level.INFO)
Logger.getLogger("akka").setLevel(Level.INFO)
val conf = new SparkConf()
conf.set("spark.master","local[3]")
conf.set("spark.app.name","StreamingWithCheckPoint")
val sc = new SparkContext(conf)
// setting the checkPointing Location
val checkPointDirectory = "hdfs://localhost:9000/user/myCheckPointFolder"
//method or function to resume the spark app from checkpoint directory
def functionToCreateContext() : StreamingContext ={
val ssc = new StreamingContext(sc,Seconds(20))
val ds1 = ssc.socketTextStream("localhost",3456)
val ds2 = ds1.map(x => x + " Stream ")
val ds3 = ds2.map(x => x + " Streaming ")
// the data of ds3 gets checkPointed
ds3.checkpoint(Seconds(20))
ds3.print()
ssc.checkpoint(checkPointDirectory)
ssc
}
val context = StreamingContext.getOrCreate(checkPointDirectory,functionToCreateContext _)
context.start()
context.awaitTermination()
}
}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.log4j.Logger
import org.apache.log4j.Level
object RecoveryCheckPoint{
def main(args:Array[String]): Unit={
Logger.getLogger("org").setLevel(Level.INFO)
Logger.getLogger("akka").setLevel(Level.INFO)
val conf = new SparkConf()
conf.set("spark.master","local[3]")
conf.set("spark.app.name","StreamingWithCheckPoint")
val sc = new SparkContext(conf)
// setting the checkPointing Location
val checkPointDirectory = "hdfs://localhost:9000/user/myCheckPointFolder"
//method or function to resume the spark app from checkpoint directory
def functionToCreateContext() : StreamingContext ={
val ssc = new StreamingContext(sc,Seconds(20))
val ds1 = ssc.socketTextStream("localhost",3456)
val ds2 = ds1.map(x => x + " Stream ")
val ds3 = ds2.map(x => x + " Streaming ")
// the data of ds3 gets checkPointed
ds3.checkpoint(Seconds(20))
ds3.print()
ssc.checkpoint(checkPointDirectory)
ssc
}
//Get StreamingContext from checkpoint data or create a new one
context.start()
context.awaitTermination()
}
}
Nice article,keep posting more article with us.
ReplyDeletethank you...
big data and hadoop training