Wednesday, April 10, 2019

Dataframe Operations in Scala

spark-shell
val df1=sqlContext.read.format("json").load("file://home/cloudera/jsonfile1")
val df2=sqlContext.read.format("json").load("file://home/cloudera/jsonfile2")
val df3=sqlContext.read.format("json").load("file://home/cloudera/jsonfile3")

df2.select("age","name","address").show()
df2.select("age","name","address.pin").show()
df2.select("age","name","address.pin","address.street").show()

val r1=df1.rdd
val r1=sc.makeRDD(Array(2,3,4,5,6))
val r1=sc.parallelize(Array(2,3,4,5,6))

val df1=sqlContext.createDataFrame(Array(1,2),(2,3),(3,4))
val df2=sqlContext.createDataFrame(Seq((1,2),(2,3),(3,4))

val ds1=sqlContext.createDataset(Array((2,"hyd"),(3,"chn"),(4,"del"))
ds1.show

df1.show
val df1=sqlContext.createDataFrame(Array(1,2),(2,3),(3,4))


val r2=df2.rdd
r1
r2
val d1=r1.toDF
val d2=r2.toDF


import org.apache.spark.sql.types
r1.collect.foreach(println)

val sch1=StructType(StructField("id",IntegerType)::StructField("name",StringType)::Nil)
val d1=sqlContext.createDataFrame(r1,sch1)

d1.show()
r2.collect.foreach(println)

val sch2=StructType(StructField("address",StructType(StructField("pincode",LongType)::StructField("Street",StringType):Nil))::StructField("age",LongType)
::StructField("Name",StringType)::Nil)

val d1=sqlContext.createDataFrame(r2,sch2)
d2.show

case class C1(age:Long,name:String)
val ds1=df1.as[C1]

case class c2(address:(Struct(pin:Long,street:String)),age:Long,name:String)
val r3=ds1.rdd

cat>sampledata
2,3.4,2017-01-12 12:23:45
3,5.66,2018-10-22 23:44:56
4,7.44,2016-09-01 12:12:12
5,6.88,2011-11-19 14:32:45
df1.printSchema()

cat sampledata
val df1=sqlContext.read.format("csv").option("inferSchema","true").load("sampledata")
df1.show()

val df2=df1.withColumnRenamed("Co","id").
withColumnRenamed("c1","id2").
withColumnRenamed("c3","Timestamp")

df2.select(col("id"),(floor(col(:id2)).ceil(col("id2")),year(col("Timestamp")).
minute(col("Timestamp")),second(col("Timestamp")).show

case class c1(id:Int,id2:Double,Tmestamp:java.sql.Timestamp)

val ds2=df2.as[C1]
df2.show

val df4=df2.select(col("id"),col("id2"),current_date)
val df4=df2.select(col("id"),col("id2"),col("TimeStamp"),current_timestamp,current_date)

df4.select(col(date_diff(col("TimeStamp"),col("current_timestamp"))).show
df4.select("currentdate()",date_add(col("currentdate()"),1)).show

case class C2(id:Int,id2:Double,TimeStamp:java.sql.TimeStamp,currenttimestamp():jav.sql.TimeStamp,
currentdate():java.sql.Date)

val df5=df4.withColumnRenamed("current_timestamp()","TS").withColumnRenamed("current_date(),"DT"))
val ds5=df5.as[C2]

ds5.show()
val df6=df5.select("TS")
df6.show

fdf6.printSchema
val ds6=df6.as[java.sql.TimeStamp]
val ds7=df7.as[Double]

import sqlContext.implicits._
case class C3(TS:java.sql.TimeStamp)
val ds6=df6.as[C3]
val d7=ds7.toDF()

val rdd_ds=ds5.rdd
val rdd_df=d5.rdd
rdd_ds.collect.foreach(println)
rdd_df.collect.foreach(println)
val ds_rdd=rdd_ds.toDS()


val df_rdd=rdd_df.toDF()
Schema will not be preserved
d5.schema

val df_rdd=sqlContext.createDataFrame(rdd_df,sch)
val rd1=sc.makeRDD(Array(1,2),(2,3),(3,4))
rd1.collect.foreach(println)

val df1=rd1.toDF()
rdd_df
rdd_df.collect.foreach(prinltn)

import org.apache.spark.sql.types._
val schema=StructType(StructField("id1",IntegerType)::StructField("id2".DoubleType)::StructField
("TS1",TimeStampType)::StructField("TS2",TimeStampType)::StructField(TS2,TimeStampType,true,StructField(DT,DateTyoe,true))
val df2=sqlContext.createDataFrame(rdd_df,schma)

cat>jsonsample
{"id":1,"city":"hyd","marks":[20,30,40]}
val df=sqlContext.read.format("json").load("")
df5.show
df5.printSchema
rd=df.rdd
rd..collect.foreach(println)
val schema3=StructType(StructField("city",StringType):StructField("id",LongType)::
StructField("marks",ArrayType(LongType)).Nil)

val df=sqlContext.createDataFrame(rd,schma3)
df.show

Table-->class
Schema->definition
Rows->objects

case class Student(name:String,id:Int,course:String,Gender:String)
defined class Student

val a1=new Student("karnakar",201,34,"Spark","male")
val a2=new Student("rajesh",202,30,hadoop,male)
val a3=new Student("priya",203,21,"scala","female")
val df1=sqlContext.createDataFrame(Array(a1,a2,a3))
df1.show()

val ds1=sqlContext.createDataset(Array(a1,a2,a3))
ds1.show
df1.collect
ds1.collect

df-not preserving the schema
ds-preserving the schema

ds1.map(x=>(x.name,x.Gender)).show
df1.map(x=>(x.name,x.age))
df1.map(x=>x(0),x(1)).collect.foreach.prinltn


val df2=sqlContext.createDataFrame(Seq(new Student("shekar",204,32,"spark","male"),new Student("Harini",205.29,"scala",
"famale")))
df2.show

val newRow=Seq(new Student("shankar"m206,30,"hadoop","male"))
val newRow=Seq(new Student("shankar"m206,30,"hadoop","male")).toDF
val df3=df2.unionAll(newRow)

df3.show
df1.show
df3.show

val d1=df1.select("age")
val d2=df2.select("age")

d1.join(d2,d1("age")===d2("age"),"inner").show
d1.join(d2,d1("age")===d2("age"),"fullOuter").show

d1.registerTempTable("t1")
d2.registerTempTable("t2")
d1.show
d2,show

r1=d1.rdd
r2=d2.rdd
r1.collect.foreach(println)
r2.collect.foreach(println)

val r3=r1.map(x=>x.toString).map(x=>x.slice(1,x.length-1).toInt).repartition(2)
val r4=r.map(x=>x.toString).map(x=>x.slice(1,x.length-1).toInt).repartition(2)
val r5=r3.zip(r4)

r5.collect.foreach(prinltn)
val df3=r5.toDF("age1","age2")
df3.where("age1===age2")

df3.filter(x=>if(x(0)=x(1) "yes" else "no"
ds3.map(x=>if(x.age1>x.age2) "yes" else "no").show

No comments:

Post a Comment

Python Challenges Program

Challenges program: program 1: #Input :ABAABBCA #Output: A4B3C1 str1="ABAABBCA" str2="" d={} for x in str1: d[x]=d...