Saturday, February 23, 2019

Flume Spark Intergeration

Step1

flumespark.conf
------------------------------
//creation of components
agent1.sources = flumesource
agent1.channels = flumechannel
agent1.sinks = flumesink

//Source Configuration
agent1.sources.flumesource.type = netcat
agent1.sources.flumesource.bind = localhost
agent1.sources.flumesource.port = 1234
agent1.sources.flumesource.channels = flumechannel

//Channel Configuration
agent1.channels.flumechannel.type = memory
agent1.channels.flumechannel.capacity=1000
agent1.channels.flumechannel.transactionCapacity=100

//Sink Configuration
agent1.sinks.flumesink.type = avro
agent1.sinks.flumesink.hostname = 192.168.0.104    =====>Windows IP addreess
agent1.sinks.flumesink.port = 7777
agent1.sinks.flumesink.channel = flumechannel



2)Set up the program in windows intellij

import org.apache.spark.streaming.flume._
import org.apache.spark.streaming.{Seconds,StreamingContext}
import org.apache.spark.{SparkConf,SparkContext}
import org.apache.log4j.Logger
import org.apache.log4j.Level
object FlumeSpark {
  Logger.getLogger("org").setLevel(Level.OFF)
  Logger.getLogger("akka").setLevel(Level.OFF)
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("spark=flumeintegeration(pure based approach").setMaster("local[2]")
    val sc = new SparkContext(conf)
    val BatchInterval = 20
    val host = "192.168.0.104"
    val portno = 7777
    val ssc = new StreamingContext(sc, Seconds(BatchInterval))

    //Pulling the data from flume application
    val flumedata = FlumeUtils.createStream(ssc, host, portno)
    val res = flumedata.map { x =>
      val event = x.event
      val messageBody = new String(event.getBody.array())
      messageBody
    }
    res.print()
    ssc.start()
    ssc.awaitTermination()
  }
}


build.sbt in the intellij


name := "SparkSampleProgram"

version := "0.1"

scalaVersion := "2.11.12"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.0"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.4.0"

libraryDependencies += "org.apache.spark" %% "spark-streaming-twitter" % "1.6.3"
libraryDependencies += "org.apache.spark" %% "spark-streaming-flume-assembly" % "2.3.2"


sbt.version = 1.0.3


3)Run the Flume code in the Ubundu


flume-ng agent --name agent1 --conf /home/hadoop/Desktop/vow --conf-file /home/hadoop/Desktop/vow/flumespark1.conf -Dflume.root.logger=DEBUG,console


4)Message from Ubundu

curl telnet://localhost:4444
I love india   ---> the message pass into window host



No comments:

Post a Comment

Python Challenges Program

Challenges program: program 1: #Input :ABAABBCA #Output: A4B3C1 str1="ABAABBCA" str2="" d={} for x in str1: d[x]=d...