Tuesday, February 19, 2019

Spark Streaming with CheckPoint Recovery Example

// Here is the sample program which supports CheckPoint Recovery in Spark Streaming

import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.log4j.Logger
import org.apache.log4j.Level


object RecoveryCheckPoint{
  def main(args:Array[String]): Unit={
    Logger.getLogger("org").setLevel(Level.INFO)
    Logger.getLogger("akka").setLevel(Level.INFO)
    val conf = new SparkConf()
    conf.set("spark.master","local[3]")
    conf.set("spark.app.name","StreamingWithCheckPoint")

    val sc = new SparkContext(conf)

    // setting the checkPointing Location
    val checkPointDirectory = "hdfs://localhost:9000/user/myCheckPointFolder"

    //method or function to resume the spark app from checkpoint directory
    def functionToCreateContext() : StreamingContext ={
      val ssc = new StreamingContext(sc,Seconds(20))
      val ds1 = ssc.socketTextStream("localhost",3456)
      val ds2 = ds1.map(x => x + " Stream ")
      val ds3 = ds2.map(x => x + " Streaming ")

      // the data of ds3 gets checkPointed
      ds3.checkpoint(Seconds(20))
      ds3.print()

      ssc.checkpoint(checkPointDirectory)
      ssc
    }


    //Get StreamingContext from checkpoint data or create a new one

    val context = StreamingContext.getOrCreate(checkPointDirectory,functionToCreateContext _)

    context.start()
    context.awaitTermination()
  }
}

1 comment:

Python Challenges Program

Challenges program: program 1: #Input :ABAABBCA #Output: A4B3C1 str1="ABAABBCA" str2="" d={} for x in str1: d[x]=d...