Friday, February 15, 2019

SPARK SQL Internal Mechanism

Spark SQL concepts:

Important:

df-Java kyro serialization<>catalyst optimizer<>Lazy evaluation<>persistance<>file verification<>sql operation<>schema<>structured

ds--> Encoders<>catalyst optimizer<>Lazy evaluation<>persistance<>file verification<>In 1x,non-sql and above 2x. sql opr and non sql operation<>schema<>structured,semi-structured,unstructured


rdd-->Java kyro serialization<>NIL<>Lazy evaluation<>persistance<>NIL<>non-sql operation<>schema<>unstructured


from spark 2x onwards

DS API=RDD API+DF API both are supporting


JVM object --->serielized object----> restate the JVM object between the nodes if incase of shuffling The benefits is for more security


RDD-->LOW level programming
DF,DS-->Highlevel programming

SPARK URL port number is 4040

Serialization:
1)Java Serialization
2)Kyro serialization--->faster and efficient serialization(DF,RDD)
3)Encoders (DS as specific)


RDD and DATAframe both are under JAVA/KHRO serialization
Dataset is following Encoders


Schema vs Schemaless File


Parquet,Avro,Orc==>Schema file(data+schema)

csv,json,xml==>schemaless file(data only)


Internal mechanism:
The rdd is in the background processing even we run the dataframe and dataset coding.....See spark url and select the job,
df,ds===>spark sql engine===>rdd
In the spark sql engine==> Cataylst optimizer as decison make that choose which one is the best appraoch....

for ex, df.select("*").where(..)
1st processing filter
2nd loading the less data...> it is determined by catallyst optimizer
Catalyst optimizer is planning respectively [parse logical plan--> analyse logical plan-->optimized logical plan-->physical plan that choose the best approach)

CODE:


scala> val df=spark.read.format("json").load("file:///home/osboxes/jsonfile.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> df.show
+---+----+
|age|name|
+---+----+
| 25| abc|
| 55| xyz|
| 45| pqr|
+---+----+
scala> case class C1(age:Long,name:String)
defined class C1
scala> val ds=df.as[C1]
ds: org.apache.spark.sql.Dataset[C1] = [age: bigint, name: string]
scala> df
res1: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> ds.show
+---+----+
|age|name|
+---+----+
| 25| abc|
| 55| xyz|
| 45| pqr|
+---+----+

Various writing operation

df.write.saveAsTable("default:jsontable")

spark-shell --packages com.databricks:spark-csv_2.10:1.5.0,com.databricks:spark-xml_2.10:0.4.1,org.apache.avro:avro:1.6.0

df.write.format("csv").save("file:///home/osboxes/csvout")

df.rdd.getNumPartitions

df.coalesce(1).write.format("csv").load(file:///home/osboxes/csvout1")

df.write.format("json").save("file:///home/osboxes/jsonout1")

df.write.format("xml").save("file:///home/osboxes/xmlout")

df.write.format("Parquet").save("file:///home/osboxes/parquetout")

df.write.format("com.databricks.spark.avro").save("file:///home/osboxes/avroout")

val df2=spark.read.format("com.databricks.spark,avro").load("file:///home/osboxes/avroout/")

df2show

val rdd1=df3.rdd

val rdd2=ds3.rdd

df3.show

ds3.show

rdd1.collect.foreach(println)

rdd2.collect.foreach(println)

rdd1
rdd2

val rdf=rdd1.toDF()

val  ds=rdd1.toDS()

val rdf=rdd2.toDF()

val  ds=rdd2.toDS()

val sch=StructType(StructField("age",LongType)::StructField("name",StringType)::NIL)

val df2=sparkcreateDataframe(rdd1,sch)

scala> val df4=spark.read.table("default.webtable")

df4: org.apache.spark.sql.DataFrame = [ipAddress: string, port: string]

scala> df4.show

+---------------+---------+
|      ipAddress|     port|
+---------------+---------+
|   199.72.81.55|804571201|
| 199.120.110.21|804571209|
| 199.120.110.21|804571211|
|205.212.115.106|804571212|
| 129.94.144.152|804571213|
| 129.94.144.152|804571217|
| 199.120.110.21|804571217|
|   199.72.81.55|804571201|
| 199.120.110.21|804571209|
| 199.120.110.21|804571211|
|205.212.115.106|804571212|
| 129.94.144.152|804571213|
| 129.94.144.152|804571217|
| 199.120.110.21|804571217|
+---------------+---------+



No comments:

Post a Comment

Python Challenges Program

Challenges program: program 1: #Input :ABAABBCA #Output: A4B3C1 str1="ABAABBCA" str2="" d={} for x in str1: d[x]=d...