Spark SQL concepts:
Important:
df-Java kyro serialization<>catalyst optimizer<>Lazy evaluation<>persistance<>file verification<>sql operation<>schema<>structured
ds--> Encoders<>catalyst optimizer<>Lazy evaluation<>persistance<>file verification<>In 1x,non-sql and above 2x. sql opr and non sql operation<>schema<>structured,semi-structured,unstructured
rdd-->Java kyro serialization<>NIL<>Lazy evaluation<>persistance<>NIL<>non-sql operation<>schema<>unstructured
from spark 2x onwards
DS API=RDD API+DF API both are supporting
JVM object --->serielized object----> restate the JVM object between the nodes if incase of shuffling The benefits is for more security
RDD-->LOW level programming
DF,DS-->Highlevel programming
SPARK URL port number is 4040
Serialization:
1)Java Serialization
2)Kyro serialization--->faster and efficient serialization(DF,RDD)
3)Encoders (DS as specific)
RDD and DATAframe both are under JAVA/KHRO serialization
Dataset is following Encoders
Schema vs Schemaless File
csv,json,xml==>schemaless file(data only)
Internal mechanism:
The rdd is in the background processing even we run the dataframe and dataset coding.....See spark url and select the job,
df,ds===>spark sql engine===>rdd
In the spark sql engine==> Cataylst optimizer as decison make that choose which one is the best appraoch....
for ex, df.select("*").where(..)
1st processing filter
2nd loading the less data...> it is determined by catallyst optimizer
Catalyst optimizer is planning respectively [parse logical plan--> analyse logical plan-->optimized logical plan-->physical plan that choose the best approach)
CODE:
scala> val df=spark.read.format("json").load("file:///home/osboxes/jsonfile.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> df.show
+---+----+
|age|name|
+---+----+
| 25| abc|
| 55| xyz|
| 45| pqr|
+---+----+
scala> case class C1(age:Long,name:String)
defined class C1
scala> val ds=df.as[C1]
ds: org.apache.spark.sql.Dataset[C1] = [age: bigint, name: string]
scala> df
res1: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> ds.show
+---+----+
|age|name|
+---+----+
| 25| abc|
| 55| xyz|
| 45| pqr|
+---+----+
Various writing operation
df.write.saveAsTable("default:jsontable")
spark-shell --packages com.databricks:spark-csv_2.10:1.5.0,com.databricks:spark-xml_2.10:0.4.1,org.apache.avro:avro:1.6.0
df.write.format("csv").save("file:///home/osboxes/csvout")
df.rdd.getNumPartitions
df.coalesce(1).write.format("csv").load(file:///home/osboxes/csvout1")
df.write.format("json").save("file:///home/osboxes/jsonout1")
df.write.format("xml").save("file:///home/osboxes/xmlout")
df.write.format("Parquet").save("file:///home/osboxes/parquetout")
df.write.format("com.databricks.spark.avro").save("file:///home/osboxes/avroout")
val df2=spark.read.format("com.databricks.spark,avro").load("file:///home/osboxes/avroout/")
df2show
val rdd1=df3.rdd
val rdd2=ds3.rdd
df3.show
ds3.show
rdd1.collect.foreach(println)
rdd2.collect.foreach(println)
rdd1
rdd2
val rdf=rdd1.toDF()
val ds=rdd1.toDS()
val rdf=rdd2.toDF()
val ds=rdd2.toDS()
val sch=StructType(StructField("age",LongType)::StructField("name",StringType)::NIL)
val df2=sparkcreateDataframe(rdd1,sch)
scala> val df4=spark.read.table("default.webtable")
df4: org.apache.spark.sql.DataFrame = [ipAddress: string, port: string]
scala> df4.show
+---------------+---------+
| ipAddress| port|
+---------------+---------+
| 199.72.81.55|804571201|
| 199.120.110.21|804571209|
| 199.120.110.21|804571211|
|205.212.115.106|804571212|
| 129.94.144.152|804571213|
| 129.94.144.152|804571217|
| 199.120.110.21|804571217|
| 199.72.81.55|804571201|
| 199.120.110.21|804571209|
| 199.120.110.21|804571211|
|205.212.115.106|804571212|
| 129.94.144.152|804571213|
| 129.94.144.152|804571217|
| 199.120.110.21|804571217|
+---------------+---------+
Important:
df-Java kyro serialization<>catalyst optimizer<>Lazy evaluation<>persistance<>file verification<>sql operation<>schema<>structured
ds--> Encoders<>catalyst optimizer<>Lazy evaluation<>persistance<>file verification<>In 1x,non-sql and above 2x. sql opr and non sql operation<>schema<>structured,semi-structured,unstructured
rdd-->Java kyro serialization<>NIL<>Lazy evaluation<>persistance<>NIL<>non-sql operation<>schema<>unstructured
from spark 2x onwards
DS API=RDD API+DF API both are supporting
JVM object --->serielized object----> restate the JVM object between the nodes if incase of shuffling The benefits is for more security
RDD-->LOW level programming
DF,DS-->Highlevel programming
SPARK URL port number is 4040
Serialization:
1)Java Serialization
2)Kyro serialization--->faster and efficient serialization(DF,RDD)
3)Encoders (DS as specific)
RDD and DATAframe both are under JAVA/KHRO serialization
Dataset is following Encoders
Schema vs Schemaless File
Parquet,Avro,Orc==>Schema file(data+schema)
Internal mechanism:
The rdd is in the background processing even we run the dataframe and dataset coding.....See spark url and select the job,
df,ds===>spark sql engine===>rdd
In the spark sql engine==> Cataylst optimizer as decison make that choose which one is the best appraoch....
for ex, df.select("*").where(..)
1st processing filter
2nd loading the less data...> it is determined by catallyst optimizer
Catalyst optimizer is planning respectively [parse logical plan--> analyse logical plan-->optimized logical plan-->physical plan that choose the best approach)
CODE:
scala> val df=spark.read.format("json").load("file:///home/osboxes/jsonfile.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> df.show
+---+----+
|age|name|
+---+----+
| 25| abc|
| 55| xyz|
| 45| pqr|
+---+----+
scala> case class C1(age:Long,name:String)
defined class C1
scala> val ds=df.as[C1]
ds: org.apache.spark.sql.Dataset[C1] = [age: bigint, name: string]
scala> df
res1: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> ds.show
+---+----+
|age|name|
+---+----+
| 25| abc|
| 55| xyz|
| 45| pqr|
+---+----+
Various writing operation
df.write.saveAsTable("default:jsontable")
spark-shell --packages com.databricks:spark-csv_2.10:1.5.0,com.databricks:spark-xml_2.10:0.4.1,org.apache.avro:avro:1.6.0
df.write.format("csv").save("file:///home/osboxes/csvout")
df.rdd.getNumPartitions
df.coalesce(1).write.format("csv").load(file:///home/osboxes/csvout1")
df.write.format("json").save("file:///home/osboxes/jsonout1")
df.write.format("xml").save("file:///home/osboxes/xmlout")
df.write.format("Parquet").save("file:///home/osboxes/parquetout")
df.write.format("com.databricks.spark.avro").save("file:///home/osboxes/avroout")
val df2=spark.read.format("com.databricks.spark,avro").load("file:///home/osboxes/avroout/")
df2show
val rdd1=df3.rdd
val rdd2=ds3.rdd
df3.show
ds3.show
rdd1.collect.foreach(println)
rdd2.collect.foreach(println)
rdd1
rdd2
val rdf=rdd1.toDF()
val ds=rdd1.toDS()
val rdf=rdd2.toDF()
val ds=rdd2.toDS()
val sch=StructType(StructField("age",LongType)::StructField("name",StringType)::NIL)
val df2=sparkcreateDataframe(rdd1,sch)
scala> val df4=spark.read.table("default.webtable")
df4: org.apache.spark.sql.DataFrame = [ipAddress: string, port: string]
scala> df4.show
+---------------+---------+
| ipAddress| port|
+---------------+---------+
| 199.72.81.55|804571201|
| 199.120.110.21|804571209|
| 199.120.110.21|804571211|
|205.212.115.106|804571212|
| 129.94.144.152|804571213|
| 129.94.144.152|804571217|
| 199.120.110.21|804571217|
| 199.72.81.55|804571201|
| 199.120.110.21|804571209|
| 199.120.110.21|804571211|
|205.212.115.106|804571212|
| 129.94.144.152|804571213|
| 129.94.144.152|804571217|
| 199.120.110.21|804571217|
+---------------+---------+
No comments:
Post a Comment