Input file:
id,starttime,endtime,status
101,2014-01-01,2013-01-01,DROPPED
102,2014-01-01,2013-01-01,DROPPED
103,2014-01-01,2013-01-01,DROPPED
104,2014-01-01,2013-01-01,FAILED
105,2014-01-01,2013-01-01,FAILED
106,2014-01-01,2013-01-01,FAILED
107,2014-01-01,2013-01-01,SUCCESS
108,2014-01-01,2013-01-01,SUCCESS
109,2014-01-01,2013-01-01,SUCCESS
Pyspark Dataframe Operations:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
sparkdriver=SparkSession.builder.master('local').appName('demoapp2').getOrCreate()
sparkdriver
#Reading through RDD
r1=sparkdriver.sparkContext.textFile("hdfs://127.0.0.1/user/sample/sample.csv")
r1.getNumPartitions
hostname -local ==>Ubundo
ping 127.0.0.1
In windows coding:
df_hdfs=sparkdriver.read.format('csv').option('delimter','\t').schema(sch).\
load('hdfs://172.16.38.131:8020/user/sample/sample.csv')
df_hdfs.show(5)
+---+----------+----------+-------+
|_c0| _c1| _c2| _c3|
+---+----------+----------+-------+
|101|2014-01-01|2013-01-01|DROPPED|
|102|2014-01-01|2013-01-01|DROPPED|
|103|2014-01-01|2013-01-01|DROPPED|
|104|2014-01-01|2013-01-01| FAILED|
|105|2014-01-01|2013-01-01| FAILED|
|106|2014-01-01|2013-01-01| FAILED|
|107|2014-01-01|2013-01-01|SUCCESS|
|108|2014-01-01|2013-01-01|SUCCESS|
|109|2014-01-01|2013-01-01|SUCCESS|
---+----------+----------+-------+
| id| starttime| endtime| status|
+---+----------+----------+-------+
|101|2014-01-01|2013-01-01|DROPPED|
|102|2014-01-01|2013-01-01|DROPPED|
|103|2014-01-01|2013-01-01|DROPPED|
|104|2014-01-01|2013-01-01| FAILED|
|105|2014-01-01|2013-01-01| FAILED|
|106|2014-01-01|2013-01-01| FAILED|
|107|2014-01-01|2013-01-01|SUCCESS|
|108|2014-01-01|2013-01-01|SUCCESS|
|109|2014-01-01|2013-01-01|SUCCESS|
+---+----------+----------+-------+
hdfs dfs -ls /dfdata1
hdfs dfs -cat filename
file-->10 blocks
spark(fil)-->processed as 10 partitions
print(df_hdfs.rdd.getNumPartitions)
print(sparkdriver.sparkContext.defaultParallelism)
No of blocks=No of paritions
sch=StructType(
[StructField('id',StringType()),
StructField('stattime',StringType()),
structField('status',StringType()),
structField('status',StringType())
])
---+----------+----------+-------+
| id| starttime| endtime| status|
+---+----------+----------+-------+
|101|2014-01-01|2013-01-01|DROPPED|
|102|2014-01-01|2013-01-01|DROPPED|
|103|2014-01-01|2013-01-01|DROPPED|
|104|2014-01-01|2013-01-01| FAILED|
|105|2014-01-01|2013-01-01| FAILED|
|106|2014-01-01|2013-01-01| FAILED|
|107|2014-01-01|2013-01-01|SUCCESS|
|108|2014-01-01|2013-01-01|SUCCESS|
|109|2014-01-01|2013-01-01|SUCCESS|
+---+----------+----------+-------+
spark(scala->jvm objects)----py4j----phython object
df_hdfs.where('status="SUCCESS"').show(5)
df_hdfs.withColumn('fromno',col('fromno').cast('long')).select('fromno',col('fromno')/10).show
df_hdfs.withColumn('fromno',col('fromno').cast('long')).withColumn('fromnonew',col('fromno')/10).show
df_hdfs.select('fromno',col('fromno')/10).show
df_hdfs.select('status').distinct().show()
df_hdfs.withColumn('severity',when(col('status')=="DROPPED",'critical').
when(col('status')=="Failed",'Medium').otherwise('ok')).show(10)
df_hdfs.withColumn('severity',when(col('status')=="DROPPED",2).
when(col('status')=="Failed",1).otherwise(0)).show(10)
if it existing columns
df2_hdfs=df1_hdfs.withColumn('severity',when(col('status')=="DROPPED",2).\
when(col('status')=="FAILED",1).otherwise(0))
df2_hdfs.show(5)
---+----------+----------+-------+------+--------+
| id| starttime| endtime| status|fromno|severity|
+---+----------+----------+-------+------+--------+
|101|2014-01-01|2013-01-01|DROPPED| 101| 2|
|102|2014-01-01|2013-01-01|DROPPED| 102| 2|
|103|2014-01-01|2013-01-01|DROPPED| 103| 2|
|104|2014-01-01|2013-01-01| FAILED| 104| 1|
|105|2014-01-01|2013-01-01| FAILED| 105| 1|
df2_hdfs=df1_hdfs.withColumn('status',when(col('status')=="DROPPED",2).\
when(col('status')=="FAILED",1).otherwise(0))
df2_hdfs.show(5)
---+----------+----------+------+------+
| id| starttime| endtime|status|fromno|
+---+----------+----------+------+------+
|101|2014-01-01|2013-01-01| 2| 101|
|102|2014-01-01|2013-01-01| 2| 102|
|103|2014-01-01|2013-01-01| 2| 103|
|104|2014-01-01|2013-01-01| 1| 104|
|105|2014-01-01|2013-01-01| 1| 105|
df_hdfs.withColumn('status',when(col('status')=="DROPPED",2).
when(col('status')=="Failed",1).otherwise(0)).show(10)
if th same value for all the rows of the columns
df_hdfs2=df_hdfs.withColumn('cnew',lit(200))
df_hdfs3.show(5)
+---+----------+----------+-------+------+--------+----+
| id| starttime| endtime| status|fromno|severity|cnew|
+---+----------+----------+-------+------+--------+----+
|101|2014-01-01|2013-01-01|DROPPED| 101| 2| 200|
|102|2014-01-01|2013-01-01|DROPPED| 102| 2| 200|
|103|2014-01-01|2013-01-01|DROPPED| 103| 2| 200|
|104|2014-01-01|2013-01-01| FAILED| 104| 1| 200|
|105|2014-01-01|2013-01-01| FAILED| 105| 1| 200|
+---+----------+----------+-------+------+--------+----+
//Rename the existing columns
dfs_hdfs4=df_hdfs.withColumnRenamed('fromno','diallingno')
---+----------+----------+-------+------+--------+
| id| starttime| endtime| status|fromno|SEVERITY|
+---+----------+----------+-------+------+--------+
|101|2014-01-01|2013-01-01|DROPPED| 101| 2|
|102|2014-01-01|2013-01-01|DROPPED| 102| 2|
|103|2014-01-01|2013-01-01|DROPPED| 103| 2|
|104|2014-01-01|2013-01-01| FAILED| 104| 1|
|105|2014-01-01|2013-01-01| FAILED| 105| 1|
+---+----------+----------+-------+------+--------+
//drop the columns
df_hdfs.drop('id','status').show(5) ==> spark 2 delete multiple column but spark 1 can delete 1 column at a time
+----------+-------+------+--------+
| endtime| status|fromno|severity|
+----------+-------+------+--------+
|2013-01-01|DROPPED| 101| 2|
|2013-01-01|DROPPED| 102| 2|
|2013-01-01|DROPPED| 103| 2|
|2013-01-01| FAILED| 104| 1|
|2013-01-01| FAILED| 105| 1|
df-->dsl domain specific language
tables-->pure sql language
we can create tables(temporary,permanant) from dataframes if we want to use sql
df_hdfs.registerTempTable('temptable1')
sparkdriver.sql('select count(*) from temptable1 group by status').show()
df_hdfs2.select('severity').printSchema
sparkdriver.sql('create database db1').show()
sparkdriver.sql('show databases').show()
sparkdriver.sql('show tables in default').show()
df_hdfs.write.saveAsTable('db1.permtable')
sparkdriver.sql('show tables in db1').show()
database and tables--->permanent
storage system-->LFS(spark-warehouse) and Metastore will be there from 2.0
metastore -->embedded derby
creation of permanant table is not supported in spark 1.x without hive integeration,but in spark 2.x version
df_hdfs.creatOrReplaceTempView('temptable2') -->latest one
id,starttime,endtime,status
101,2014-01-01,2013-01-01,DROPPED
102,2014-01-01,2013-01-01,DROPPED
103,2014-01-01,2013-01-01,DROPPED
104,2014-01-01,2013-01-01,FAILED
105,2014-01-01,2013-01-01,FAILED
106,2014-01-01,2013-01-01,FAILED
107,2014-01-01,2013-01-01,SUCCESS
108,2014-01-01,2013-01-01,SUCCESS
109,2014-01-01,2013-01-01,SUCCESS
Pyspark Dataframe Operations:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
sparkdriver=SparkSession.builder.master('local').appName('demoapp2').getOrCreate()
sparkdriver
#Reading through RDD
r1=sparkdriver.sparkContext.textFile("hdfs://127.0.0.1/user/sample/sample.csv")
r1.getNumPartitions
hostname -local ==>Ubundo
ping 127.0.0.1
In windows coding:
df_hdfs=sparkdriver.read.format('csv').option('delimter','\t').schema(sch).\
load('hdfs://172.16.38.131:8020/user/sample/sample.csv')
df_hdfs.show(5)
+---+----------+----------+-------+
|_c0| _c1| _c2| _c3|
+---+----------+----------+-------+
|101|2014-01-01|2013-01-01|DROPPED|
|102|2014-01-01|2013-01-01|DROPPED|
|103|2014-01-01|2013-01-01|DROPPED|
|104|2014-01-01|2013-01-01| FAILED|
|105|2014-01-01|2013-01-01| FAILED|
|106|2014-01-01|2013-01-01| FAILED|
|107|2014-01-01|2013-01-01|SUCCESS|
|108|2014-01-01|2013-01-01|SUCCESS|
|109|2014-01-01|2013-01-01|SUCCESS|
---+----------+----------+-------+
| id| starttime| endtime| status|
+---+----------+----------+-------+
|101|2014-01-01|2013-01-01|DROPPED|
|102|2014-01-01|2013-01-01|DROPPED|
|103|2014-01-01|2013-01-01|DROPPED|
|104|2014-01-01|2013-01-01| FAILED|
|105|2014-01-01|2013-01-01| FAILED|
|106|2014-01-01|2013-01-01| FAILED|
|107|2014-01-01|2013-01-01|SUCCESS|
|108|2014-01-01|2013-01-01|SUCCESS|
|109|2014-01-01|2013-01-01|SUCCESS|
+---+----------+----------+-------+
hdfs dfs -ls /dfdata1
hdfs dfs -cat filename
file-->10 blocks
spark(fil)-->processed as 10 partitions
print(df_hdfs.rdd.getNumPartitions)
print(sparkdriver.sparkContext.defaultParallelism)
No of blocks=No of paritions
sch=StructType(
[StructField('id',StringType()),
StructField('stattime',StringType()),
structField('status',StringType()),
structField('status',StringType())
])
---+----------+----------+-------+
| id| starttime| endtime| status|
+---+----------+----------+-------+
|101|2014-01-01|2013-01-01|DROPPED|
|102|2014-01-01|2013-01-01|DROPPED|
|103|2014-01-01|2013-01-01|DROPPED|
|104|2014-01-01|2013-01-01| FAILED|
|105|2014-01-01|2013-01-01| FAILED|
|106|2014-01-01|2013-01-01| FAILED|
|107|2014-01-01|2013-01-01|SUCCESS|
|108|2014-01-01|2013-01-01|SUCCESS|
|109|2014-01-01|2013-01-01|SUCCESS|
+---+----------+----------+-------+
spark(scala->jvm objects)----py4j----phython object
df_hdfs.where('status="SUCCESS"').show(5)
df_hdfs.withColumn('fromno',col('fromno').cast('long')).select('fromno',col('fromno')/10).show
df_hdfs.withColumn('fromno',col('fromno').cast('long')).withColumn('fromnonew',col('fromno')/10).show
df_hdfs.select('fromno',col('fromno')/10).show
df_hdfs.select('status').distinct().show()
df_hdfs.withColumn('severity',when(col('status')=="DROPPED",'critical').
when(col('status')=="Failed",'Medium').otherwise('ok')).show(10)
df_hdfs.withColumn('severity',when(col('status')=="DROPPED",2).
when(col('status')=="Failed",1).otherwise(0)).show(10)
if it existing columns
df2_hdfs=df1_hdfs.withColumn('severity',when(col('status')=="DROPPED",2).\
when(col('status')=="FAILED",1).otherwise(0))
df2_hdfs.show(5)
---+----------+----------+-------+------+--------+
| id| starttime| endtime| status|fromno|severity|
+---+----------+----------+-------+------+--------+
|101|2014-01-01|2013-01-01|DROPPED| 101| 2|
|102|2014-01-01|2013-01-01|DROPPED| 102| 2|
|103|2014-01-01|2013-01-01|DROPPED| 103| 2|
|104|2014-01-01|2013-01-01| FAILED| 104| 1|
|105|2014-01-01|2013-01-01| FAILED| 105| 1|
df2_hdfs=df1_hdfs.withColumn('status',when(col('status')=="DROPPED",2).\
when(col('status')=="FAILED",1).otherwise(0))
df2_hdfs.show(5)
---+----------+----------+------+------+
| id| starttime| endtime|status|fromno|
+---+----------+----------+------+------+
|101|2014-01-01|2013-01-01| 2| 101|
|102|2014-01-01|2013-01-01| 2| 102|
|103|2014-01-01|2013-01-01| 2| 103|
|104|2014-01-01|2013-01-01| 1| 104|
|105|2014-01-01|2013-01-01| 1| 105|
df_hdfs.withColumn('status',when(col('status')=="DROPPED",2).
when(col('status')=="Failed",1).otherwise(0)).show(10)
if th same value for all the rows of the columns
df_hdfs2=df_hdfs.withColumn('cnew',lit(200))
df_hdfs3.show(5)
+---+----------+----------+-------+------+--------+----+
| id| starttime| endtime| status|fromno|severity|cnew|
+---+----------+----------+-------+------+--------+----+
|101|2014-01-01|2013-01-01|DROPPED| 101| 2| 200|
|102|2014-01-01|2013-01-01|DROPPED| 102| 2| 200|
|103|2014-01-01|2013-01-01|DROPPED| 103| 2| 200|
|104|2014-01-01|2013-01-01| FAILED| 104| 1| 200|
|105|2014-01-01|2013-01-01| FAILED| 105| 1| 200|
+---+----------+----------+-------+------+--------+----+
//Rename the existing columns
dfs_hdfs4=df_hdfs.withColumnRenamed('fromno','diallingno')
---+----------+----------+-------+------+--------+
| id| starttime| endtime| status|fromno|SEVERITY|
+---+----------+----------+-------+------+--------+
|101|2014-01-01|2013-01-01|DROPPED| 101| 2|
|102|2014-01-01|2013-01-01|DROPPED| 102| 2|
|103|2014-01-01|2013-01-01|DROPPED| 103| 2|
|104|2014-01-01|2013-01-01| FAILED| 104| 1|
|105|2014-01-01|2013-01-01| FAILED| 105| 1|
+---+----------+----------+-------+------+--------+
//drop the columns
df_hdfs.drop('id','status').show(5) ==> spark 2 delete multiple column but spark 1 can delete 1 column at a time
+----------+-------+------+--------+
| endtime| status|fromno|severity|
+----------+-------+------+--------+
|2013-01-01|DROPPED| 101| 2|
|2013-01-01|DROPPED| 102| 2|
|2013-01-01|DROPPED| 103| 2|
|2013-01-01| FAILED| 104| 1|
|2013-01-01| FAILED| 105| 1|
df-->dsl domain specific language
tables-->pure sql language
we can create tables(temporary,permanant) from dataframes if we want to use sql
df_hdfs.registerTempTable('temptable1')
sparkdriver.sql('select count(*) from temptable1 group by status').show()
df_hdfs2.select('severity').printSchema
sparkdriver.sql('create database db1').show()
sparkdriver.sql('show databases').show()
sparkdriver.sql('show tables in default').show()
df_hdfs.write.saveAsTable('db1.permtable')
sparkdriver.sql('show tables in db1').show()
database and tables--->permanent
storage system-->LFS(spark-warehouse) and Metastore will be there from 2.0
metastore -->embedded derby
creation of permanant table is not supported in spark 1.x without hive integeration,but in spark 2.x version
df_hdfs.creatOrReplaceTempView('temptable2') -->latest one
No comments:
Post a Comment