Tuesday, February 19, 2019

Wordcount program using Spark Streaming

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.log4j.Logger
import org.apache.log4j.Level

// we are going to reconfigure SparkConfiguration so, we stop existing SparkContex


 val conf = new SparkConf().setMaster("local[4]").setAppName("workdcount")
 val ssc = new StreamingContext(conf,Seconds(30))
 val data = ssc.socketTextStream("localhost",7890)
 val wc = data.flatMap(_.split(" ")).map (x => (x,1)).reduceByKey(_+_)

// start netcat and type something there
hadoop@hadoop:/$ nc -lk 7890
i love all beautiful things
god i love all beautiful
god i love
god i

 // result is printed on console

Time: 1549431720000 ms


 // result is written in hdfs
hadoop@hadoop:/$ hdfs dfs -ls hdfs://localhost:9000/user/streamingexa/
Found 1 items
drwxr-xr-x   - hadoop supergroup          0 2019-02-06 11:12 hdfs://localhost:9000/user/streamingexa/ex-1549431720000
hadoop@hadoop:/$ hdfs dfs -ls hdfs://localhost:9000/user/streamingexa/ex-1549431720000
Found 5 items
-rw-r--r--   3 hadoop supergroup          0 2019-02-06 11:12 hdfs://localhost:9000/user/streamingexa/ex-1549431720000/_SUCCESS
-rw-r--r--   3 hadoop supergroup          8 2019-02-06 11:12 hdfs://localhost:9000/user/streamingexa/ex-1549431720000/part-00000
-rw-r--r--   3 hadoop supergroup         39 2019-02-06 11:12 hdfs://localhost:9000/user/streamingexa/ex-1549431720000/part-00001
-rw-r--r--   3 hadoop supergroup          9 2019-02-06 11:12 hdfs://localhost:9000/user/streamingexa/ex-1549431720000/part-00002
-rw-r--r--   3 hadoop supergroup          0 2019-02-06 11:12 hdfs://localhost:9000/user/streamingexa/ex-1549431720000/part-00003
hadoop@hadoop:/$ hdfs dfs -cat hdfs://localhost:9000/user/streamingexa/ex-1549431720000/part-00001

One more approach without stopping SparkContext:

hadoop@hadoop:~$ spark-shell --master local[2]

import org.apache.spark.streaming._
val ssc = new StreamingContext(sc,Seconds(15))
val d1 = ssc.socketTextStream("localhost",1234)
val d2 = d1.flatMap(x => x.split(" ")).map(x => (x,1)).reduceByKey( (x,y) => (x+y))


hadoop@hadoop:~$ nc -lk 1234
hadoop is batch processing

Time: 1549458165000 ms

import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming.{Seconds,StreamingContext}
import org.apache.spark.{SparkConf,SparkContext}
import org.apache.spark.sql.functions._

object Streaming1{
def main(args:Array[String]):Unit = {

  // Create a configuration object
  val conf = new SparkConf()
  //creation of spark context object
  val sc = new SparkContext(conf)
  //creation of sqlContext
  val sqlContext = new SQLContext(sc)
  import sqlContext.implicits._
  // Creation of Streaming Context
  val ssc = new StreamingContext(sc,Seconds(2))
  //Receiver Port
  val d1 = ssc.socketTextStream("localhost",3456)
  // DStream (sequence of RDDs) into Dataframe
  d1.foreachRDD { x =>
   val ds1 = x.toDS()
   val ds2 = ds1.flatMap(x => x.split(" "))
   val ds3 = ds2.map(x => (x,1))

// start netcat and open port 3456

hadoop@hadoop:~/Desktop/vow$ nc -lk 3456
god hath no better prize
love is love
love is life
life is love

//start spark shell

$ spark-shell --master local[2]

//stop existing SparkContext
scala> sc.stop

//load spark (scala) file
scala> :load streamingDF.sc

// call the main method
scala> Streaming1.main(null)
// result
|    _1|count(_2)|
|  love|        4|
|  life|        2|
|    is|        3|
|  hath|        1|
|   god|        1|
|better|        1|
|    no|        1|
| prize|        1|

