Tuesday, February 19, 2019

Twitter Streaming access via Spark Streaming

Twitter Streaming access via Spark Streaming



import org.apache.spark.streaming.twitter.TwitterUtils
import org.apache.spark.streaming.{Seconds,StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.log4j.Logger
import org.apache.log4j.Level

object IntTwit{
  def main(args:Array[String]):Unit = {
    Logger.getLogger("org").setLevel(Level.ERROR)
    Logger.getLogger("akka").setLevel(Level.ERROR)
    //create a configuration object
    val conf = new SparkConf()
    conf.set("spark.master","local[2]")
    conf.set("spark.app.name","streamingApp1")

    //creation of spark context object
    val sc = new SparkContext(conf)


         //AWS Credentials
//sc.hadoopConfiguration.set("fs.s3a.access.key","AKIAJ7....")
//sc.hadoopConfiguration.set("fs.s3a.secret.key","Btqy7XO...")

// collect these keys from https://developer.twitter.com
    System.setProperty("twitter4j.oauth.consumerKey","5GtJQ213......")
    System.setProperty("twitter4j.oauth.consumerSecret","1k1phi12f....")
    System.setProperty("twitter4j.oauth.accessToken","19807726-7pxmUfe...")
    System.setProperty("twitter4j.oauth.accessTokenSecret","AmJ0K5gQ4pIxK....")

    val ssc = new StreamingContext(sc,Seconds(10))

    //Receiving tweets from twitter
    val ds1 = TwitterUtils.createStream(ssc,None)
     ds1.print()
    val ds2 = ds1.map (x => x.getText).filter(x => x.contains("RIPBraveHearts"))

    ds2.print()

         //Write gathered tweets into Amazon bucket

//ds2.saveAsTextFiles("s3a://sparksamplebucket/trumptweets")

      //Write gathered tweents into hadoop file system
ds2.saveAsTextFiles("hdfs://localhost:9000/trumptweets")

    ssc.start()
    ssc.awaitTermination()
  }
}



build.sbt:
-------------
name := "TwitInt"

version := "0.1"

scalaVersion := "2.11.12"


// https://mvnrepository.com/artifact/org.apache.spark/spark-core
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-sql
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-twitter
libraryDependencies += "org.apache.spark" %% "spark-streaming-twitter" % "1.6.3"




build.properties:
sbt.version = 1.2.8


Download spark-core_2.11-1.5.2.logging.jar from https://raw.githubusercontent.com/swordsmanliu/SparkStreamingHbase/master/lib/spark-core_2.11-1.5.2.logging.jar

Add that .jar file in IntelliJ

Click File from the toolbar
Project Structure
Select Modules at the left panel
Dependencies tab
'+' → JARs or directories
Select spark-core_2.11-1.5.2.logging.jar

No comments:

Post a Comment

Python Challenges Program

Challenges program: program 1: #Input :ABAABBCA #Output: A4B3C1 str1="ABAABBCA" str2="" d={} for x in str1: d[x]=d...