textFileStream in Spark Streaming Example with IntelliJ IDEA
build.properties:
-----------------
sbt.version = 1.2.1
build.sbt:
---------
name := "SparkSampleProgram"
version := "0.1"
scalaVersion := "2.11.12"
// https://mvnrepository.com/artifact/org.apache.spark/spark-core
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-sql
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.4.0"
StreamingExa:
---------------
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.log4j.Logger
import org.apache.log4j.Level
object StreamingExa {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
val conf = new SparkConf()
conf.set("spark.master","local[2]")
conf.set("spark.app.name","sparkstreamingHDFS")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc,Seconds(20))
//val ds1 = ssc.socketTextStream("localhost",3456)
val ds1 = ssc.textFileStream("hdfs://localhost:9000/user/StreamingData")
//val ds1 = ssc.textFileStream("E:\\StreamingExa\\")
ds1.print()
ssc.start()
ssc.awaitTermination()
}
}
hadoop@hadoop:~/Desktop/vow$ hdfs dfs -mkdir /user/StreamingData
hadoop@hadoop:~/Desktop/vow$ hdfs dfs -copyFromLocal emp_data-1.txt /user/StreamingData
hadoop@hadoop:~/Desktop/vow$ hdfs dfs -copyFromLocal B.txt /user/StreamingData
hadoop@hadoop:~/Desktop/vow$ hdfs dfs -put airports11.txt /user/StreamingData
Result:
-------------------------------------------
Time: 1550062260000 ms
-------------------------------------------
-------------------------------------------
Time: 1550062280000 ms
-------------------------------------------
1,accounts
3,science
4,statistics
8,agriculture
9,finance
-------------------------------------------
Time: 1550062300000 ms
-------------------------------------------
airports.text columns:
----------------------
AirportID,Name,City,Country,FAA,ICAO,Latitude,Longitude,Altitude,Timezone,DST,Tz
-----------------
sbt.version = 1.2.1
build.sbt:
---------
name := "SparkSampleProgram"
version := "0.1"
scalaVersion := "2.11.12"
// https://mvnrepository.com/artifact/org.apache.spark/spark-core
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-sql
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.4.0"
StreamingExa:
---------------
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.log4j.Logger
import org.apache.log4j.Level
object StreamingExa {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
val conf = new SparkConf()
conf.set("spark.master","local[2]")
conf.set("spark.app.name","sparkstreamingHDFS")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc,Seconds(20))
//val ds1 = ssc.socketTextStream("localhost",3456)
val ds1 = ssc.textFileStream("hdfs://localhost:9000/user/StreamingData")
//val ds1 = ssc.textFileStream("E:\\StreamingExa\\")
ds1.print()
ssc.start()
ssc.awaitTermination()
}
}
// Open a Terminal and do the following
// Here I copied some random files from local linux to hdfs
hadoop@hadoop:~/Desktop/vow$ hdfs dfs -copyFromLocal emp_data-1.txt /user/StreamingData
hadoop@hadoop:~/Desktop/vow$ hdfs dfs -copyFromLocal B.txt /user/StreamingData
hadoop@hadoop:~/Desktop/vow$ hdfs dfs -put airports11.txt /user/StreamingData
Result:
-------------------------------------------
Time: 1550062260000 ms
-------------------------------------------
-------------------------------------------
Time: 1550062280000 ms
-------------------------------------------
1,accounts
3,science
4,statistics
8,agriculture
9,finance
-------------------------------------------
Time: 1550062300000 ms
-------------------------------------------
airports.text columns:
----------------------
AirportID,Name,City,Country,FAA,ICAO,Latitude,Longitude,Altitude,Timezone,DST,Tz
No comments:
Post a Comment