userID, MovieID, Rating, TimeStamp
---------------------------------
196 242 3 881250949
186 302 3 891717742
22 377 1 878887116
244 51 2 880606923
166 346 1 886397596
CODE:
val r1=sc.textFile("/user/osboxes/ratings.txt").map(x=>x.split(" ")).map{x=>
val userid=x(0).toInt
val movieid=x(1).toInt
val rating=x(2).toInt
val time=x(3)
(userid,movieid,rating,time)
}
val ratings_df=r1.toDF("userid","movieid","rating","time")
ratings_df.count
ratings_df.persist
ratings_df.show(10)
+------+-------+------+---------+
|userid|movieid|rating| time|
+------+-------+------+---------+
| 196| 242| 3|881250949|
| 186| 302| 3|891717742|
| 22| 377| 1|878887116|
| 244| 51| 2|880606923|
| 166| 346| 1|886397596|
+------+-------+------+---------+
sc.startTime
res28: Long = 1550092485906
ratings_df.printSchema
root
|-- userid: integer (nullable = false)
|-- movieid: integer (nullable = false)
|-- rating: integer (nullable = false)
|-- time: string (nullable = true)
val ratings_df2=ratings_df.withColumn("time",col("time").cast("Long"))
ratings_df2.show
|userid|movieid|rating| time|
+------+-------+------+---------+
| 196| 242| 3|881250949|
| 186| 302| 3|891717742|
| 22| 377| 1|878887116|
| 244| 51| 2|880606923|
| 166| 346| 1|886397596|
+------+-------+------+---------+
scala> ratings_df2.printSchema
root
|-- userid: integer (nullable = false)
|-- movieid: integer (nullable = false)
|-- rating: integer (nullable = false)
|-- time: long (nullable = true)
scala>import java.util.Date
import java.text.SimpleDateFormat
def epochToTimeStamp(epochMillis:Long):String={
val time:SimpleDateFormat=new SimpleDateFormat("yyyy-MM-dd hh:mm:ss")
time.format(epochMillis)
}
epochToTimeStamp(121423534543L)
res32: String = 1973-11-06 02:15:34
sqlContext.udf.register("timestamp",epochToTimeStamp _)
val timestamp=udf(epochToTimeStamp _)
val ratings_df3=ratings_df2.withColumn("time",timestamp(col("time")))).show(5)
+------+-------+------+-------------------+
|userid|movieid|rating| time|
+------+-------+------+-------------------+
| 196| 242| 3|1970-01-11 10:17:30|
| 186| 302| 3|1970-01-11 01:11:57|
| 22| 377| 1|1970-01-11 09:38:07|
| 244| 51| 2|1970-01-11 10:06:46|
| 166| 346| 1|1970-01-11 11:43:17|
+------+-------+------+-------------------+
ratings_df3.printSchema
root
|-- userid: integer (nullable = false)
|-- movieid: integer (nullable = false)
|-- rating: integer (nullable = false)
|-- time: string (nullable = true)
ratings_df3.select("userid","movieid","rating",year(col("time")),month(col("time")),hour(col("time"))).show(20)
ratings_df3.select(col("userid"),col("movieid"),col("rating"),year(col("time")),month(col("time")),hour(col("time"))).show(20)
+------+-------+------+----------+-----------+----------+
|userid|movieid|rating|year(time)|month(time)|hour(time)|
+------+-------+------+----------+-----------+----------+
| 196| 242| 3| 1970| 1| 10|
| 186| 302| 3| 1970| 1| 1|
| 22| 377| 1| 1970| 1| 9|
| 244| 51| 2| 1970| 1| 10|
| 166| 346| 1| 1970| 1| 11|
+------+-------+------+----------+-----------+----------+
ratings_df4.write.saveAstable("default.rating")
super Ramesh
ReplyDelete