WEBLOGS Analysis:
val rdd1=sc.textFile("file:///home/osboxes/weblog.tsv")
Input file:
host logname time method url response bytes referer useragent
199.72.81.55 -804571201 GET /history/apollo/ 200 6245
199.120.110.21 -804571209 GET /shuttle/missions/sts-73/mission-sts-73.html 200 4085
199.120.110.21 -804571211 GET /shuttle/missions/sts-73/sts-73-patch-small.gif 200 4179
205.212.115.106 -804571212 GET /shuttle/countdown/countdown.html 200 3985
129.94.144.152 -804571213 GET / 200 7074
129.94.144.152 -804571217 GET /images/ksclogo-medium.gif 304 0
199.120.110.21 -804571217 GET /images/launch-logo.gif 200 1713
val ip_pattern="[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}".r
val port_pattern=" - [0-9]{1,9}".r
val rdd2=rdd1.map{x=>
val ip=ip_pattern.findFirstIn(x).get
val port=port_pattern.findFirstIn(x).get
val port1=port.slice(1,port.length)
(ip,port1)
}
rdd2.take(10).foreach(println)
scala> rdd2.take(10).foreach(println)
(199.72.81.55,804571201)
(199.120.110.21,804571209)
(199.120.110.21,804571211)
(205.212.115.106,804571212)
(129.94.144.152,804571213)
(129.94.144.152,804571217)
(199.120.110.21,804571217)
val df1=rdd2.toDF("ipAddress","port")
df1.show
scala> df1.printSchema
root
|-- ipAddress: string (nullable = true)
|-- port: string (nullable = true)
scala> df1.show
+---------------+---------+
| ipAddress| port|
+---------------+---------+
| 199.72.81.55|804571201|
| 199.120.110.21|804571209|
| 199.120.110.21|804571211|
|205.212.115.106|804571212|
| 129.94.144.152|804571213|
| 129.94.144.152|804571217|
| 199.120.110.21|804571217|
+---------------+---------+
val df2=df1.select("*").groupBy("ipAddress").agg(count("*"))
df2.show
scala> df2.show
+---------------+--------+
| ipAddress|count(1)|
+---------------+--------+
| 199.120.110.21| 3|
|205.212.115.106| 1|
| 129.94.144.152| 2|
| 199.72.81.55| 1|
+---------------+--------+
Stored into Hive:
df1.write.mode("append").saveAsTable("default.webtable")
val rdd1=sc.textFile("file:///home/osboxes/weblog.tsv")
Input file:
host logname time method url response bytes referer useragent
199.72.81.55 -804571201 GET /history/apollo/ 200 6245
199.120.110.21 -804571209 GET /shuttle/missions/sts-73/mission-sts-73.html 200 4085
199.120.110.21 -804571211 GET /shuttle/missions/sts-73/sts-73-patch-small.gif 200 4179
205.212.115.106 -804571212 GET /shuttle/countdown/countdown.html 200 3985
129.94.144.152 -804571213 GET / 200 7074
129.94.144.152 -804571217 GET /images/ksclogo-medium.gif 304 0
199.120.110.21 -804571217 GET /images/launch-logo.gif 200 1713
val ip_pattern="[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}".r
val port_pattern=" - [0-9]{1,9}".r
val rdd2=rdd1.map{x=>
val ip=ip_pattern.findFirstIn(x).get
val port=port_pattern.findFirstIn(x).get
val port1=port.slice(1,port.length)
(ip,port1)
}
rdd2.take(10).foreach(println)
scala> rdd2.take(10).foreach(println)
(199.72.81.55,804571201)
(199.120.110.21,804571209)
(199.120.110.21,804571211)
(205.212.115.106,804571212)
(129.94.144.152,804571213)
(129.94.144.152,804571217)
(199.120.110.21,804571217)
val df1=rdd2.toDF("ipAddress","port")
df1.show
scala> df1.printSchema
root
|-- ipAddress: string (nullable = true)
|-- port: string (nullable = true)
scala> df1.show
+---------------+---------+
| ipAddress| port|
+---------------+---------+
| 199.72.81.55|804571201|
| 199.120.110.21|804571209|
| 199.120.110.21|804571211|
|205.212.115.106|804571212|
| 129.94.144.152|804571213|
| 129.94.144.152|804571217|
| 199.120.110.21|804571217|
+---------------+---------+
val df2=df1.select("*").groupBy("ipAddress").agg(count("*"))
df2.show
scala> df2.show
+---------------+--------+
| ipAddress|count(1)|
+---------------+--------+
| 199.120.110.21| 3|
|205.212.115.106| 1|
| 129.94.144.152| 2|
| 199.72.81.55| 1|
+---------------+--------+
Stored into Hive:
df1.write.mode("append").saveAsTable("default.webtable")
No comments:
Post a Comment