Friday, February 15, 2019

Spark RDD VS Dataframe VS Dataset fundamental Concepts

Comparisons of RDD VS DATAFRAME VS DATASET

Important:

df-Java kyro serialization<>catalyst optimizer<>Lazy evaluation<>persistance<>file verification<>sql operation<>schema<>structured

ds--> Encoders<>catalyst optimizer<>Lazy evaluation<>persistance<>file verification<>In 1x,non-sql and above 2x. sql opr and non sql operation<>schema<>structured,semi-structured,unstructured


rdd-->Java kyro serialization<>NIL<>Lazy evaluation<>persistance<>NIL<>non-sql operation<>schema<>unstructured


from spark 2x onwards
DS API=RDD API+DF API both are supporting


Files supporting:


rdd=unstructured file(text file)
dataset=sql +functional operation above 2.0 , In 1X version, it is supported only unstructured
dataframe=sql operation(csv,parquet,orc,xml,avro)

Conversion:

RDD,DATAFRAME,DATASET are interchangable
RDD=====> TRANSFORM===> DATASET & DATAFRAME
DATAFRAME===>TRANSFORM==>RDD & Dataset
Dataset====>Transform==>RDD & DATAFRAME

RDD                DF              DS
--------------------------------------
Non sql oper  SQL operation  In 1.x on sql operation(In 2.x sql+non sql are supporting)
No schema       schema            schema

CODE:
RDD:
scala> val rd=sc.textFile("/user/osboxes/movies.txt")

rd: org.apache.spark.rdd.RDD[String] = /user/osboxes/movies.txt MapPartitionsRDD[7] at textFile at <console>:24

scala> rd.take(5).foreach(println)

1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
2,Jumanji (1995),Adventure|Children|Fantasy
3,Grumpier Old Men (1995),Comedy|Romance
4,Waiting to Exhale (1995),Comedy|Drama|Romance
5,Father of the Bride Part II (1995),Comedy

val rd2=rd1.map(x=>x.split(","))

rd2: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD

rd2.take(5).foreach(println)

val rd3=rd2.map(x=>{
val mid=x(0).toInt
val title=x(1)
val genre=x(2)
(mid,title,genre)
})

rd3.take(5).foreach(println)

1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy)
(2,Jumanji (1995),Adventure|Children|Fantasy)
(3,Grumpier Old Men (1995),Comedy|Romance)
(4,Waiting to Exhale (1995),Comedy|Drama|Romance)
(5,Father of the Bride Part II (1995),Comedy)

2)samplefile
Spark is big data technology
hadoop is big data technology
spark and hadoop are big data technologies



val r1=sc.textFile("file:///home/osboxes/samplefile")

r1: org.apache.spark.rdd.RDD[String] = file:///home/osboxes/samplefile MapPartitionsRDD[5] at textFile at <console>:24

val d1=spark.read.format("text").load("file:///home/osboxes/samplefile")

d1: org.apache.spark.sql.DataFrame = [value: string]

scala> r1.take(5)

res3: Array[String] = Array(Spark is big data technology, hadoop is big data technology, spark and hadoop are big data technologies)
scala> d1.show(5)
+--------------------+
|               value|
+--------------------+
|Spark is big data...|
|hadoop is big dat...|
|spark and hadoop ...|
+--------------------+

val r2=r1.map(x=>x.split(" "))

r2: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[9] at map at <console>:26

scala> val ds1=d1.as[String]

ds1: org.apache.spark.sql.Dataset[String] = [value: string]

scala> val ds2=ds1.map(x=>x.split(" "))

ds2: org.apache.spark.sql.Dataset[Array[String]] = [value: array<string>]

scala> ds2.show
+--------------------+
|               value|
+--------------------+
|[Spark, is, big, ...|
|[hadoop, is, big,...|
|[spark, and, hado...|
+--------------------+


Interchangable:
RDD:
val df=rd2.toDF
val ds=rd2.toDS

scala> val df=r1.toDF.show

+--------------------+
|               value|
+--------------------+
|Spark is big data...|
|hadoop is big dat...|
|spark and hadoop ...|
+--------------------+

df: Unit = ()

scala> val df=r1.toDS.show

+--------------------+
|               value|
+--------------------+
|Spark is big data...|
|hadoop is big dat...|
|spark and hadoop ...|
+--------------------+

df: Unit = ()

DataFrame
case class Newc(_1:Int,_2:String,_3:String)
val ds2=df.as[Newc]
val r5=df.rdd

r5: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[22] at rdd at <console>:25

Dataset
val df5=ds2.toDF
val rd2=ds2_toRDD


DATAFRAME:


scala> val df=spark.read.format("json").load("file:///home/osboxes/jsonfile.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> df.show
+---+----+
|age|name|
+---+----+
| 25| abc|
| 55| xyz|
| 45| pqr|
+---+----+
scala> val df=spark.read.format("text").load("file:///home/osboxes/movies.txt")

df: org.apache.spark.sql.DataFrame = [value: string]

scala> df.show
+--------------------+
|               value|
+--------------------+
|1,Toy Story (1995...|
|2,Jumanji (1995),...|
|3,Grumpier Old Me...|
|4,Waiting to Exha...|
|5,Father of the B...|
|6,Heat (1995),Act...|
|7,Sabrina (1995),...|
|8,Tom and Huck (1...|
|9,Sudden Death (1...|
|10,GoldenEye (199...|
|11,"American Pres...|
|12,Dracula: Dead ...|
|13,Balto (1995),A...|
|14,Nixon (1995),D...|
|15,Cutthroat Isla...|
|16,Casino (1995),...|
|17,Sense and Sens...|
|18,Four Rooms (19...|
|19,Ace Ventura: W...|
|20,Money Train (1...|
+--------------------+
only showing top 20 rows

val df2=spark.read.format("json").("file:///home/osboxes/jsonfile")

val ds2=df2.as[class1]

case class class1(age:Long,name:String)

ds2.toDF
val age:Int=20

val name:String='scala'

case class Data(age:Int,name:String)
df2

val ds2=df2.as[Data]


DATASET:
val df=spark.read.format("text").load("file:///home/osboxes/movies.txt")

scala> val ds1=df.as[String]

ds1: org.apache.spark.sql.Dataset[String] = [value: string]

scala> ds1.show
+--------------------+
|               value|
+--------------------+
|1,Toy Story (1995...|
|2,Jumanji (1995),...|
|3,Grumpier Old Me...|
|4,Waiting to Exha...|
|5,Father of the B...|
|6,Heat (1995),Act...|
|7,Sabrina (1995),...|
|8,Tom and Huck (1...|
|9,Sudden Death (1...|
|10,GoldenEye (199...|
|11,"American Pres...|
|12,Dracula: Dead ...|
|13,Balto (1995),A...|
|14,Nixon (1995),D...|
|15,Cutthroat Isla...|
|16,Casino (1995),...|
|17,Sense and Sens...|
|18,Four Rooms (19...|
|19,Ace Ventura: W...|
|20,Money Train (1...|
+--------------------+
only showing top 20 rows

Dataset cab be alternative to RDD

scala> val df2=spark.read.format("json").load("file:///home/osboxes/jsonfile.json")

df2: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> case class Class1(Age:Long,Name:String)

defined class Class1

scala> val ds2=df2.as[Class1]

ds2: org.apache.spark.sql.Dataset[Class1] = [age: bigint, name: string]

scala> ds2.show
+---+----+
|age|name|
+---+----+
| 25| abc|
| 55| xyz|
| 45| pqr|
+---+----+

scala> val ds3=ds1.map(x=>x.split(","))

ds3: org.apache.spark.sql.Dataset[Array[String]] = [value: array<string>]

scala> ds3.show
+--------------------+
|               value|
+--------------------+
|[1, Toy Story (19...|
|[2, Jumanji (1995...|
|[3, Grumpier Old ...|
|[4, Waiting to Ex...|
|[5, Father of the...|
|[6, Heat (1995), ...|
|[7, Sabrina (1995...|
|[8, Tom and Huck ...|
|[9, Sudden Death ...|
|[10, GoldenEye (1...|
|[11, "American Pr...|
|[12, Dracula: Dea...|
|[13, Balto (1995)...|
|[14, Nixon (1995)...|
|[15, Cutthroat Is...|
|[16, Casino (1995...|
|[17, Sense and Se...|
|[18, Four Rooms (...|
|[19, Ace Ventura:...|
|[20, Money Train ...|
+--------------------+
only showing top 20 rows

scala> val ds4=ds3.map(x=>(x(0).toInt,x(1),x(2)))

ds4: org.apache.spark.sql.Dataset[(Int, String, String)] = [_1: int, _2: string ... 1 more field]

scala> ds4.show
+---+--------------------+--------------------+
| _1|                  _2|                  _3|
+---+--------------------+--------------------+
|  1|    Toy Story (1995)|Adventure|Animati...|
|  2|      Jumanji (1995)|Adventure|Childre...|
|  3|Grumpier Old Men ...|      Comedy|Romance|
|  4|Waiting to Exhale...|Comedy|Drama|Romance|
|  5|Father of the Bri...|              Comedy|
|  6|         Heat (1995)|Action|Crime|Thri...|
|  7|      Sabrina (1995)|      Comedy|Romance|
|  8| Tom and Huck (1995)|  Adventure|Children|
|  9| Sudden Death (1995)|              Action|
| 10|    GoldenEye (1995)|Action|Adventure|...|
| 11| "American President|         The (1995)"|
| 12|Dracula: Dead and...|       Comedy|Horror|
| 13|        Balto (1995)|Adventure|Animati...|
| 14|        Nixon (1995)|               Drama|
| 15|Cutthroat Island ...|Action|Adventure|...|
| 16|       Casino (1995)|         Crime|Drama|
| 17|Sense and Sensibi...|       Drama|Romance|
| 18|   Four Rooms (1995)|              Comedy|
| 19|Ace Ventura: When...|              Comedy|
| 20|  Money Train (1995)|Action|Comedy|Cri...|
+---+--------------------+--------------------+
\
scala> ds4.printSchema
root
 |-- _1: integer (nullable = false)
 |-- _2: string (nullable = true)
 |-- _3: string (nullable = true)

READING and WRITING Different FILE FORMATS:

spark-shell --packages com.databricks:spark-csv_2.10:1.5.0,com.databricks:spark-xml_2.10:0.4.1,com.databricks:spark-avro_2_10:4.0.0

scala> val df1=spark.read.format("csv").load("file:///home/osboxes/csvfile")

19/02/16 02:44:25 WARN DataSource: Multiple sources found for csv (org.apache.spark.sql.execution.datasources.csv.CSVFileFormat,
com.databricks.spark.csv.DefaultSource15), defaulting to the internal datasource
(org.apache.spark.sql.execution.datasources.csv.CSVFileFormat).
df1: org.apache.spark.sql.DataFrame = [_c0: string, _c1: string ... 1 more field]


scala> df1.show

+---+---+---+
|_c0|_c1|_c2|
+---+---+---+
|  1|  2|3.4|
|  2|  3|4.5|
|  3|4.5|7.8|

scala> df1

res10: org.apache.spark.sql.DataFrame = [_c0: string, _c1: string ... 1 more field]

scala> case class C3(_C0:String,_C1:String,_C2:String)

defined class C3

scala> val ds1=df1.as[C3]

ds1: org.apache.spark.sql.Dataset[C3] = [_c0: string, _c1: string ... 1 more field]

scala> ds1.show

+---+---+---+
|_c0|_c1|_c2|
+---+---+---+
|  1|  2|3.4|
|  2|  3|4.5|
|  3|4.5|7.8|

XML file:
<student>
  <name>kumar</name>
  <id>201</id>
  <course>spark</course>
  </student>
<student>
  <name>priya</name>
  <id>202</id>
  <course>scala</course>
  <domain>bigdata</domain>
 </student>

 scala>  val df2=spark.read.format("xml").option("rowTag","student").load("file:///home/osboxes/xmlfile")

df2: org.apache.spark.sql.DataFrame = [course: string, domain: string ... 2 more fields]

scala> df2.show
+------+-------+---+-----+
|course| domain| id| name|
+------+-------+---+-----+
| spark|   null|201|kumar|
| scala|bigdata|202|priya|
+------+-------+---+-----+

scala>  case class c4(course:String,domain:String,id:Long,name:String)

defined class c4

scala>  val ds2=df2.as[c4]

ds2: org.apache.spark.sql.Dataset[c4] = [course: string, domain: string ... 2 more fields]

scala>  ds2.show

+------+-------+---+-----+
|course| domain| id| name|
+------+-------+---+-----+
| spark|   null|201|kumar|
| scala|bigdata|202|priya|
+------+-------+---+-----+

 MYSQL:
 spark-shell --packages com.databricks:spark-csv_2.10:1.5.0,com.databricks:spark-xml_2.10:0.4.1,org.apache.avro:avro:1.6.0

 val df_mysql=spark.read.format("jdbc").
 option("driver","com.mysql.jdbc.Driver").
 option("url","jdbc:mysql://localhost:3306").
 option("dbtable","test.patient").
 option("user","root").
 option("password","root").
 load()

 df_mysql.show

 +---+---------+-----------+------+-------+
|pid|     name|       drug|gender|tot_amt|
+---+---------+-----------+------+-------+
|  1|saravanan|       avil|     M|    100|
|  2|  senthil|    metacin|     M|    200|
|  3|  Gowtham|paracetamol|     M|    300|
|  1|saravanan|       avil|     M|    100|
|  2|  senthil|    metacin|     M|    200|
|  3|  Gowtham|paracetamol|     M|    300|
+---+---------+-----------+------+-------+








No comments:

Post a Comment

Python Challenges Program

Challenges program: program 1: #Input :ABAABBCA #Output: A4B3C1 str1="ABAABBCA" str2="" d={} for x in str1: d[x]=d...