Wednesday, February 13, 2019

SPARK UDF Operation


userID, MovieID, Rating, TimeStamp
---------------------------------
196 242 3 881250949
186 302 3 891717742
22 377 1 878887116
244 51 2 880606923
166 346 1 886397596


CODE:

val r1=sc.textFile("/user/osboxes/ratings.txt").map(x=>x.split(" ")).map{x=>
val userid=x(0).toInt
val movieid=x(1).toInt
val rating=x(2).toInt
val time=x(3)
(userid,movieid,rating,time)
}

val ratings_df=r1.toDF("userid","movieid","rating","time")
ratings_df.count
ratings_df.persist
ratings_df.show(10)

+------+-------+------+---------+
|userid|movieid|rating|     time|
+------+-------+------+---------+
|   196|    242|     3|881250949|
|   186|    302|     3|891717742|
|    22|    377|     1|878887116|
|   244|     51|     2|880606923|
|   166|    346|     1|886397596|
+------+-------+------+---------+

sc.startTime
res28: Long = 1550092485906
ratings_df.printSchema
root
 |-- userid: integer (nullable = false)
 |-- movieid: integer (nullable = false)
 |-- rating: integer (nullable = false)
 |-- time: string (nullable = true)

val ratings_df2=ratings_df.withColumn("time",col("time").cast("Long"))

ratings_df2.show

|userid|movieid|rating|     time|
+------+-------+------+---------+
|   196|    242|     3|881250949|
|   186|    302|     3|891717742|
|    22|    377|     1|878887116|
|   244|     51|     2|880606923|
|   166|    346|     1|886397596|
+------+-------+------+---------+

scala> ratings_df2.printSchema
root
 |-- userid: integer (nullable = false)
 |-- movieid: integer (nullable = false)
 |-- rating: integer (nullable = false)
 |-- time: long (nullable = true)

scala>import java.util.Date
import java.text.SimpleDateFormat
def epochToTimeStamp(epochMillis:Long):String={
   val time:SimpleDateFormat=new SimpleDateFormat("yyyy-MM-dd hh:mm:ss")
   time.format(epochMillis)
}

epochToTimeStamp(121423534543L)
res32: String = 1973-11-06 02:15:34

sqlContext.udf.register("timestamp",epochToTimeStamp _)

val timestamp=udf(epochToTimeStamp _)

val ratings_df3=ratings_df2.withColumn("time",timestamp(col("time")))).show(5)

+------+-------+------+-------------------+
|userid|movieid|rating|               time|
+------+-------+------+-------------------+
|   196|    242|     3|1970-01-11 10:17:30|
|   186|    302|     3|1970-01-11 01:11:57|
|    22|    377|     1|1970-01-11 09:38:07|
|   244|     51|     2|1970-01-11 10:06:46|
|   166|    346|     1|1970-01-11 11:43:17|
+------+-------+------+-------------------+

ratings_df3.printSchema
root
 |-- userid: integer (nullable = false)
 |-- movieid: integer (nullable = false)
 |-- rating: integer (nullable = false)
 |-- time: string (nullable = true)

ratings_df3.select("userid","movieid","rating",year(col("time")),month(col("time")),hour(col("time"))).show(20)

ratings_df3.select(col("userid"),col("movieid"),col("rating"),year(col("time")),month(col("time")),hour(col("time"))).show(20)
+------+-------+------+----------+-----------+----------+
|userid|movieid|rating|year(time)|month(time)|hour(time)|
+------+-------+------+----------+-----------+----------+
|   196|    242|     3|      1970|          1|        10|
|   186|    302|     3|      1970|          1|         1|
|    22|    377|     1|      1970|          1|         9|
|   244|     51|     2|      1970|          1|        10|
|   166|    346|     1|      1970|          1|        11|
+------+-------+------+----------+-----------+----------+

ratings_df4.write.saveAstable("default.rating")

1 comment:

Python Challenges Program

Challenges program: program 1: #Input :ABAABBCA #Output: A4B3C1 str1="ABAABBCA" str2="" d={} for x in str1: d[x]=d...