Step1
flumespark.conf
------------------------------
//creation of components
agent1.sources = flumesource
agent1.channels = flumechannel
agent1.sinks = flumesink
//Source Configuration
agent1.sources.flumesource. type = netcat
agent1.sources.flumesource. bind = localhost
agent1.sources.flumesource. port = 1234
agent1.sources.flumesource. channels = flumechannel
//Channel Configuration
agent1.channels.flumechannel. type = memory
agent1.channels.flumechannel. capacity=1000
agent1.channels.flumechannel. transactionCapacity=100
//Sink Configuration
agent1.sinks.flumesink.type = avro
agent1.sinks.flumesink. hostname = 192.168.0.104 =====>Windows IP addreess
agent1.sinks.flumesink.port = 7777
agent1.sinks.flumesink.channel = flumechannel
2)Set up the program in windows intellij
import org.apache.spark.streaming. flume._
import org.apache.spark.streaming.{ Seconds,StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.log4j.Logger
import org.apache.log4j.Level
object FlumeSpark {
Logger.getLogger("org"). setLevel(Level.OFF)
Logger.getLogger("akka"). setLevel(Level.OFF)
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("spark= flumeintegeration(pure based approach").setMaster("local[2] ")
val sc = new SparkContext(conf)
val BatchInterval = 20
val host = "192.168.0.104"
val portno = 7777
val ssc = new StreamingContext(sc, Seconds(BatchInterval))
//Pulling the data from flume application
val flumedata = FlumeUtils.createStream(ssc, host, portno)
val res = flumedata.map { x =>
val event = x.event
val messageBody = new String(event.getBody.array())
messageBody
}
res.print()
ssc.start()
ssc.awaitTermination()
}
}
build.sbt in the intellij
name := "SparkSampleProgram"
version := "0.1"
scalaVersion := "2.11.12"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.0"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.4.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming-twitter" % "1.6.3"
libraryDependencies += "org.apache.spark" %% "spark-streaming-flume- assembly" % "2.3.2"
sbt.version = 1.0.3
3)Run the Flume code in the Ubundu
flume-ng agent --name agent1 --conf /home/hadoop/Desktop/vow --conf-file /home/hadoop/Desktop/vow/ flumespark1.conf -Dflume.root.logger=DEBUG, console
4)Message from Ubundu
curl telnet://localhost:4444
I love india ---> the message pass into window host
No comments:
Post a Comment