Twitter Streaming access via Spark Streaming
import org.apache.spark.streaming.twitter.TwitterUtils
import org.apache.spark.streaming.{Seconds,StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.log4j.Logger
import org.apache.log4j.Level
object IntTwit{
def main(args:Array[String]):Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)
Logger.getLogger("akka").setLevel(Level.ERROR)
//create a configuration object
val conf = new SparkConf()
conf.set("spark.master","local[2]")
conf.set("spark.app.name","streamingApp1")
//creation of spark context object
val sc = new SparkContext(conf)
//AWS Credentials
//sc.hadoopConfiguration.set("fs.s3a.access.key","AKIAJ7....")
//sc.hadoopConfiguration.set("fs.s3a.secret.key","Btqy7XO...")
// collect these keys from https://developer.twitter.com
System.setProperty("twitter4j.oauth.consumerKey","5GtJQ213......")
System.setProperty("twitter4j.oauth.consumerSecret","1k1phi12f....")
System.setProperty("twitter4j.oauth.accessToken","19807726-7pxmUfe...")
System.setProperty("twitter4j.oauth.accessTokenSecret","AmJ0K5gQ4pIxK....")
val ssc = new StreamingContext(sc,Seconds(10))
//Receiving tweets from twitter
val ds1 = TwitterUtils.createStream(ssc,None)
ds1.print()
val ds2 = ds1.map (x => x.getText).filter(x => x.contains("RIPBraveHearts"))
ds2.print()
//Write gathered tweets into Amazon bucket
//ds2.saveAsTextFiles("s3a://sparksamplebucket/trumptweets")
//Write gathered tweents into hadoop file system
ds2.saveAsTextFiles("hdfs://localhost:9000/trumptweets")
ssc.start()
ssc.awaitTermination()
}
}
build.sbt:
-------------
name := "TwitInt"
version := "0.1"
scalaVersion := "2.11.12"
// https://mvnrepository.com/artifact/org.apache.spark/spark-core
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-sql
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-twitter
libraryDependencies += "org.apache.spark" %% "spark-streaming-twitter" % "1.6.3"
build.properties:
sbt.version = 1.2.8
Download spark-core_2.11-1.5.2.logging.jar from https://raw.githubusercontent.com/swordsmanliu/SparkStreamingHbase/master/lib/spark-core_2.11-1.5.2.logging.jar
Add that .jar file in IntelliJ
Click File from the toolbar
Project Structure
Select Modules at the left panel
Dependencies tab
'+' → JARs or directories
Select spark-core_2.11-1.5.2.logging.jar
import org.apache.spark.streaming.{Seconds,StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.log4j.Logger
import org.apache.log4j.Level
object IntTwit{
def main(args:Array[String]):Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)
Logger.getLogger("akka").setLevel(Level.ERROR)
//create a configuration object
val conf = new SparkConf()
conf.set("spark.master","local[2]")
conf.set("spark.app.name","streamingApp1")
//creation of spark context object
val sc = new SparkContext(conf)
//AWS Credentials
//sc.hadoopConfiguration.set("fs.s3a.access.key","AKIAJ7....")
//sc.hadoopConfiguration.set("fs.s3a.secret.key","Btqy7XO...")
// collect these keys from https://developer.twitter.com
System.setProperty("twitter4j.oauth.consumerKey","5GtJQ213......")
System.setProperty("twitter4j.oauth.consumerSecret","1k1phi12f....")
System.setProperty("twitter4j.oauth.accessToken","19807726-7pxmUfe...")
System.setProperty("twitter4j.oauth.accessTokenSecret","AmJ0K5gQ4pIxK....")
val ssc = new StreamingContext(sc,Seconds(10))
//Receiving tweets from twitter
val ds1 = TwitterUtils.createStream(ssc,None)
ds1.print()
val ds2 = ds1.map (x => x.getText).filter(x => x.contains("RIPBraveHearts"))
ds2.print()
//Write gathered tweets into Amazon bucket
//ds2.saveAsTextFiles("s3a://sparksamplebucket/trumptweets")
//Write gathered tweents into hadoop file system
ds2.saveAsTextFiles("hdfs://localhost:9000/trumptweets")
ssc.start()
ssc.awaitTermination()
}
}
build.sbt:
-------------
name := "TwitInt"
version := "0.1"
scalaVersion := "2.11.12"
// https://mvnrepository.com/artifact/org.apache.spark/spark-core
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-sql
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-twitter
libraryDependencies += "org.apache.spark" %% "spark-streaming-twitter" % "1.6.3"
build.properties:
sbt.version = 1.2.8
Download spark-core_2.11-1.5.2.logging.jar from https://raw.githubusercontent.com/swordsmanliu/SparkStreamingHbase/master/lib/spark-core_2.11-1.5.2.logging.jar
Add that .jar file in IntelliJ
Click File from the toolbar
Project Structure
Select Modules at the left panel
Dependencies tab
'+' → JARs or directories
Select spark-core_2.11-1.5.2.logging.jar
No comments:
Post a Comment