Tuesday, February 19, 2019

CheckPoint in Spark Streaming

CheckPoint in Spark Streaming



import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.log4j.Logger
import org.apache.log4j.Level

object CheckPointExa{
  def main(args:Array[String]):Unit={
    Logger.getLogger("org").setLevel(Level.OFF)
    Logger.getLogger("akka").setLevel(Level.OFF)
    val conf = new SparkConf()
    conf.set("spark.master","local[2]")
    conf.set("spark.app.name","CheckPointTesting")

    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc,Seconds(20))

// hdfs folder location to do CheckPoint

    ssc.checkpoint("hdfs://localhost:9000/user/myCheckPointFolder")
    val ds1 = ssc.socketTextStream("localhost",34567)
    val ds2 = ds1.map(x => x + " Stream ")
    val ds3 = ds2.map(x => x + " Streaming ")
        ds3.print()
 
    //the data of ds3 gets CheckPointed
    ds3.checkpoint(Seconds(20))
    ssc.start()
    ssc.awaitTermination()
  }
}


// Input from Netcat
hadoop@hadoop:~$ nc -lk 34567
i love you
madham




// result displayed on console
-------------------------------------------
Time: 1550145480000 ms
-------------------------------------------
i love you Stream  Streaming
madham Stream  Streaming


// checkpoint folder created after running the program

hadoop@hadoop:~$ hdfs dfs -ls /user/myCheckPointFolder
Found 7 items
drwxr-xr-x   - hadoop supergroup          0 2019-02-14 17:28 /user/myCheckPointFolder/b75036e7-c6c3-4315-961d-0784509938c2
drwxr-xr-x   - hadoop supergroup          0 2019-02-14 17:25 /user/myCheckPointFolder/c5966d03-245f-471a-a3e0-a9a9bd01c330
-rw-r--r--   3 hadoop supergroup       2967 2019-02-14 17:28 /user/myCheckPointFolder/checkpoint-1550145480000
-rw-r--r--   3 hadoop supergroup       2895 2019-02-14 17:28 /user/myCheckPointFolder/checkpoint-1550145480000.bk
-rw-r--r--   3 hadoop supergroup       2977 2019-02-14 17:28 /user/myCheckPointFolder/checkpoint-1550145500000
-rw-r--r--   3 hadoop supergroup       2972 2019-02-14 17:28 /user/myCheckPointFolder/checkpoint-1550145500000.bk
drwxr-xr-x   - hadoop supergroup          0 2019-02-14 17:27 /user/myCheckPointFolder/receivedBlockMetadata


No comments:

Post a Comment

Python Challenges Program

Challenges program: program 1: #Input :ABAABBCA #Output: A4B3C1 str1="ABAABBCA" str2="" d={} for x in str1: d[x]=d...