Tuesday, February 26, 2019

PYSPARK HDFS Operations

Input file:

id,starttime,endtime,status
101,2014-01-01,2013-01-01,DROPPED
102,2014-01-01,2013-01-01,DROPPED
103,2014-01-01,2013-01-01,DROPPED
104,2014-01-01,2013-01-01,FAILED
105,2014-01-01,2013-01-01,FAILED
106,2014-01-01,2013-01-01,FAILED
107,2014-01-01,2013-01-01,SUCCESS
108,2014-01-01,2013-01-01,SUCCESS
109,2014-01-01,2013-01-01,SUCCESS

Pyspark Dataframe Operations:

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
sparkdriver=SparkSession.builder.master('local').appName('demoapp2').getOrCreate()
sparkdriver

#Reading through RDD
r1=sparkdriver.sparkContext.textFile("hdfs://127.0.0.1/user/sample/sample.csv")

r1.getNumPartitions

hostname -local  ==>Ubundo

ping 127.0.0.1

In windows coding:
df_hdfs=sparkdriver.read.format('csv').option('delimter','\t').schema(sch).\
load('hdfs://172.16.38.131:8020/user/sample/sample.csv')

df_hdfs.show(5)

+---+----------+----------+-------+
|_c0|       _c1|       _c2|    _c3|
+---+----------+----------+-------+
|101|2014-01-01|2013-01-01|DROPPED|
|102|2014-01-01|2013-01-01|DROPPED|
|103|2014-01-01|2013-01-01|DROPPED|
|104|2014-01-01|2013-01-01| FAILED|
|105|2014-01-01|2013-01-01| FAILED|
|106|2014-01-01|2013-01-01| FAILED|
|107|2014-01-01|2013-01-01|SUCCESS|
|108|2014-01-01|2013-01-01|SUCCESS|
|109|2014-01-01|2013-01-01|SUCCESS|

---+----------+----------+-------+
| id| starttime|   endtime| status|
+---+----------+----------+-------+
|101|2014-01-01|2013-01-01|DROPPED|
|102|2014-01-01|2013-01-01|DROPPED|
|103|2014-01-01|2013-01-01|DROPPED|
|104|2014-01-01|2013-01-01| FAILED|
|105|2014-01-01|2013-01-01| FAILED|
|106|2014-01-01|2013-01-01| FAILED|
|107|2014-01-01|2013-01-01|SUCCESS|
|108|2014-01-01|2013-01-01|SUCCESS|
|109|2014-01-01|2013-01-01|SUCCESS|
+---+----------+----------+-------+

hdfs dfs -ls /dfdata1
hdfs dfs -cat filename

file-->10 blocks 
spark(fil)-->processed as 10 partitions

print(df_hdfs.rdd.getNumPartitions)

print(sparkdriver.sparkContext.defaultParallelism)

No of blocks=No of paritions

sch=StructType(
                [StructField('id',StringType()),
    StructField('stattime',StringType()),
    structField('status',StringType()),
    structField('status',StringType())
    ])

---+----------+----------+-------+
| id| starttime|   endtime| status|
+---+----------+----------+-------+
|101|2014-01-01|2013-01-01|DROPPED|
|102|2014-01-01|2013-01-01|DROPPED|
|103|2014-01-01|2013-01-01|DROPPED|
|104|2014-01-01|2013-01-01| FAILED|
|105|2014-01-01|2013-01-01| FAILED|
|106|2014-01-01|2013-01-01| FAILED|
|107|2014-01-01|2013-01-01|SUCCESS|
|108|2014-01-01|2013-01-01|SUCCESS|
|109|2014-01-01|2013-01-01|SUCCESS|
+---+----------+----------+-------+
    
spark(scala->jvm objects)----py4j----phython object  


df_hdfs.where('status="SUCCESS"').show(5)

df_hdfs.withColumn('fromno',col('fromno').cast('long')).select('fromno',col('fromno')/10).show

df_hdfs.withColumn('fromno',col('fromno').cast('long')).withColumn('fromnonew',col('fromno')/10).show

df_hdfs.select('fromno',col('fromno')/10).show

df_hdfs.select('status').distinct().show()

df_hdfs.withColumn('severity',when(col('status')=="DROPPED",'critical').
when(col('status')=="Failed",'Medium').otherwise('ok')).show(10)

df_hdfs.withColumn('severity',when(col('status')=="DROPPED",2).
when(col('status')=="Failed",1).otherwise(0)).show(10)

if it existing columns
df2_hdfs=df1_hdfs.withColumn('severity',when(col('status')=="DROPPED",2).\
when(col('status')=="FAILED",1).otherwise(0))

df2_hdfs.show(5)

---+----------+----------+-------+------+--------+
| id| starttime|   endtime| status|fromno|severity|
+---+----------+----------+-------+------+--------+
|101|2014-01-01|2013-01-01|DROPPED|   101|       2|
|102|2014-01-01|2013-01-01|DROPPED|   102|       2|
|103|2014-01-01|2013-01-01|DROPPED|   103|       2|
|104|2014-01-01|2013-01-01| FAILED|   104|       1|
|105|2014-01-01|2013-01-01| FAILED|   105|       1|

df2_hdfs=df1_hdfs.withColumn('status',when(col('status')=="DROPPED",2).\
when(col('status')=="FAILED",1).otherwise(0))

df2_hdfs.show(5)

---+----------+----------+------+------+
| id| starttime|   endtime|status|fromno|
+---+----------+----------+------+------+
|101|2014-01-01|2013-01-01|     2|   101|
|102|2014-01-01|2013-01-01|     2|   102|
|103|2014-01-01|2013-01-01|     2|   103|
|104|2014-01-01|2013-01-01|     1|   104|
|105|2014-01-01|2013-01-01|     1|   105|

df_hdfs.withColumn('status',when(col('status')=="DROPPED",2).
when(col('status')=="Failed",1).otherwise(0)).show(10)

if th same value for all the rows of the columns

df_hdfs2=df_hdfs.withColumn('cnew',lit(200))

df_hdfs3.show(5)
+---+----------+----------+-------+------+--------+----+
| id| starttime|   endtime| status|fromno|severity|cnew|
+---+----------+----------+-------+------+--------+----+
|101|2014-01-01|2013-01-01|DROPPED|   101|       2| 200|
|102|2014-01-01|2013-01-01|DROPPED|   102|       2| 200|
|103|2014-01-01|2013-01-01|DROPPED|   103|       2| 200|
|104|2014-01-01|2013-01-01| FAILED|   104|       1| 200|
|105|2014-01-01|2013-01-01| FAILED|   105|       1| 200|
+---+----------+----------+-------+------+--------+----+

//Rename the existing columns

dfs_hdfs4=df_hdfs.withColumnRenamed('fromno','diallingno')

---+----------+----------+-------+------+--------+
| id| starttime|   endtime| status|fromno|SEVERITY|
+---+----------+----------+-------+------+--------+
|101|2014-01-01|2013-01-01|DROPPED|   101|       2|
|102|2014-01-01|2013-01-01|DROPPED|   102|       2|
|103|2014-01-01|2013-01-01|DROPPED|   103|       2|
|104|2014-01-01|2013-01-01| FAILED|   104|       1|
|105|2014-01-01|2013-01-01| FAILED|   105|       1|
+---+----------+----------+-------+------+--------+

//drop the columns

df_hdfs.drop('id','status').show(5)  ==> spark 2 delete multiple column but spark 1 can delete 1 column at a time
+----------+-------+------+--------+
|   endtime| status|fromno|severity|
+----------+-------+------+--------+
|2013-01-01|DROPPED|   101|       2|
|2013-01-01|DROPPED|   102|       2|
|2013-01-01|DROPPED|   103|       2|
|2013-01-01| FAILED|   104|       1|
|2013-01-01| FAILED|   105|       1|

df-->dsl domain specific language
tables-->pure sql language


we can create tables(temporary,permanant) from dataframes if we want to use sql

df_hdfs.registerTempTable('temptable1')

sparkdriver.sql('select count(*) from temptable1 group by status').show()

df_hdfs2.select('severity').printSchema

sparkdriver.sql('create database db1').show()

sparkdriver.sql('show databases').show()

sparkdriver.sql('show tables in default').show()

df_hdfs.write.saveAsTable('db1.permtable')

sparkdriver.sql('show tables in db1').show()

database and tables--->permanent

storage system-->LFS(spark-warehouse) and Metastore will be there from 2.0
metastore -->embedded derby


creation of permanant table is not supported in spark 1.x without hive integeration,but in spark 2.x version

df_hdfs.creatOrReplaceTempView('temptable2')  -->latest one

No comments:

Post a Comment

Python Challenges Program

Challenges program: program 1: #Input :ABAABBCA #Output: A4B3C1 str1="ABAABBCA" str2="" d={} for x in str1: d[x]=d...