Pyspark Use case 2
---------
how to upgrade spark version
pip install pyspark==2.3.2
from pyspark.sql import SparkSession
spark=SparkSession.builder.master('local').appName('sparkapp').getOrCreate()
spark
Sample.txt
----------------
----------------
spark spark spark spark spark spark spark
spark spark spark spark spark
spark
spark spark spark
r1=spark.sparkContext.textFile("c:\\Users\\Nethra\\Desktop\\sample.txt")
print(r1.collect());
[u'spark spark spark spark spark spark spark', u'spark spark
spark spark spark ', u'spark', u'spark spark spark']
print(r1.count())
4
r2=r1.map(lambda x:(str)(x)).map(lambda x:x.split("
"))
print(r2.collect())
[['spark spark spark spark spark spark spark'], ['spark
spark spark spark spark '], ['spark'], ['spark spark spark']]
Get the number of columns:
--------------------------
r3=r2.map(lambda x:len(x))
print(r3.collect())
[7, 6, 1, 3]
Maximum column in the list:
-------------------------
columnsize=max(r3.collect())
print(columnsize)
7
#a function to fill the words missing with "nodata"
#input is a list of variable size and output will be a list
of fixed size(columns)
def fillmissing(x):
result= []
for i in
range(columnsize):
try:
result.append(x[i])
except:
result.append('nodata')
return result
print(fillmissing(['spark','spark']))
resultdata=r2.map(lambda x:fillmissing(x))
print(resultdata.collect())
[['spark', 'spark', 'spark', 'spark'], ['spark', 'spark',
'spark', 'spark'], ['spark', 'nodata', 'nodata', 'nodata'], ['spark', 'spark',
'spark', 'nodata']]
# Convert RDD into dataframe
df1=resultdata.toDF(['c1','c2','c3','c4'])
df1.show()
+-----+------+------+------+
| c1| c2|
c3| c4|
+-----+------+------+------+
|spark| spark| spark| spark|
|spark| spark| spark| spark|
|spark|nodata|nodata|nodata|
|spark| spark| spark|nodata|
+-----+------+------+------+
pip install pyspark==2.3.2
df2=spark.read.format('csv').option('inferSchema',True).option('header',False).load("file:///home/hadoop/sample1.csv")
df2.show(5)
+---+---+-------------------+
|_c0|_c1|
_c2|
+---+---+-------------------+
| 1|hyd|2013-12-24
23:14:45|
| 2|chn|2018-11-30
11:12:34|
from pyspark.sql.functions import sqrt
from pyspark.sql.types import IntegerType
df.select('Age',sqrt('Age')).show()
>> df2.select('_c0',sqrt('_c0')).show()
+---+------------------+
|_c0|
SQRT(_c0)|
+---+------------------+
| 1| 1.0|
|
2|1.4142135623730951|
#create a UDF
def f2(x):
return (x*x-x)/2
print(f2(20))
df3=df2.na.drop()
+---+---+-------------------+
|_c0|_c1|
_c2|
+---+---+-------------------+
| 1|hyd|2013-12-24
23:14:45|
| 2|chn|2018-11-30
11:12:34|
df3=df2.dropna()
+---+---+-------------------+
|_c0|_c1|
_c2|
+---+---+-------------------+
| 1|hyd|2013-12-24
23:14:45|
| 2|chn|2018-11-30
11:12:34|
#Registering the UDF with SparkSQL function
spark.udf.register('myfunc',f2,IntegerType())
myfunc=udf(f2,IntegerType())
df3.select('Age',myfunc('Age')).show()
from pyspark.sql import Row
r1
=Row(age=20,salary=20000,name=None,date_time='2014-12-23')
r2=Row(salary=40000,name='shanker',age=None,date_time=None)
r3=Row(age=30,name='priya',salary=40000,date_time='2015-11-10')
df4=spark.createDataFrame([r1,r2,r3,r4])
df4.printSchema()
+----+----------+-------+------+
| age| date_time|
name|salary|
+----+----------+-------+------+
| 20|2014-12-23| null| 20000|
|null|
null|shanker| 40000|
| 30|2015-11-10| priya| 40000|
+----+----------+-------+------+
>>> df4.printSchema()
root
|-- age: long
(nullable = true)
|-- date_time: string
(nullable = true)
|-- name: string
(nullable = true)
|-- salary: long
(nullable = true)
df4.show()
df4.na.fill(-1).na.fill('notprovided').show()
df4.na.fill(-1).fill('notprovided').show()
+---+-----------+-----------+------+
|age| date_time| name|salary|
+---+-----------+-----------+------+
| 20| 2014-12-23|notprovided| 20000|
| -1|notprovided|
shanker| 40000|
| 30| 2015-11-10|
priya| 40000|
+---+-----------+-----------+------+
from pyspark.sql.functions import col
>> df5=df4.withColumn('date_time',col("date_time").cast('timestamp'))
>>> df5.printSchema()
root
|-- age: long
(nullable = true)
|-- date_time:
timestamp (nullable = true)
|-- name: string
(nullable = true)
|-- salary: long
(nullable = true)
df5.select('date_time',year('date_time')).show()
------------------+---------------+
|
date_time|year(date_time)|
+-------------------+---------------+
|2014-12-23 00:00:00| 2014|
|
null| null|
|2015-11-10 00:00:00| 2015|
+-------------------+---------------+
csv file:
1,hyd,2013-12-24 23:14:45
2,chn,2018-11-30 11:12:34
df6=spark.read.format('csv').option('inferSchema',True).load("file:///home/hadoop/sample1.csv"")
df6.printSchema()
from pyspark.sql.functions import max ===> error is
thrown on the below statement
from__builtin__ import max
==> overriding the max function
l1=[12,23,34,45]
print(max[l1])
df.dropDuplicates().show()
==>using tab
No comments:
Post a Comment