Saturday, March 2, 2019

Pyspark Use case 2

Pyspark Use case 2
---------
how to upgrade spark version
pip install pyspark==2.3.2
from pyspark.sql import SparkSession
spark=SparkSession.builder.master('local').appName('sparkapp').getOrCreate()
spark
Sample.txt
----------------
spark spark spark spark spark spark spark
spark spark spark spark spark
spark
spark spark spark
r1=spark.sparkContext.textFile("c:\\Users\\Nethra\\Desktop\\sample.txt")
print(r1.collect());
[u'spark spark spark spark spark spark spark', u'spark spark spark spark spark ', u'spark', u'spark spark spark']
print(r1.count())
4
r2=r1.map(lambda x:(str)(x)).map(lambda x:x.split(" "))
print(r2.collect())
[['spark spark spark spark spark spark spark'], ['spark spark spark spark spark '], ['spark'], ['spark spark spark']]

Get the number of columns:
--------------------------
r3=r2.map(lambda x:len(x))
print(r3.collect())
[7, 6, 1, 3]

Maximum column in the list:
-------------------------
columnsize=max(r3.collect())
print(columnsize)
7
#a function to fill the words missing with  "nodata"
#input is a list of variable size and output will be a list of fixed size(columns)
def fillmissing(x):
    result= []
    for i in range(columnsize):
        try:
            result.append(x[i])
        except:
            result.append('nodata')
    return result
print(fillmissing(['spark','spark']))
       
resultdata=r2.map(lambda x:fillmissing(x))
print(resultdata.collect())
[['spark', 'spark', 'spark', 'spark'], ['spark', 'spark', 'spark', 'spark'], ['spark', 'nodata', 'nodata', 'nodata'], ['spark', 'spark', 'spark', 'nodata']]
# Convert RDD into dataframe
df1=resultdata.toDF(['c1','c2','c3','c4'])
df1.show()
+-----+------+------+------+
|   c1|    c2|    c3|    c4|
+-----+------+------+------+
|spark| spark| spark| spark|
|spark| spark| spark| spark|
|spark|nodata|nodata|nodata|
|spark| spark| spark|nodata|
+-----+------+------+------+
pip install pyspark==2.3.2
df2=spark.read.format('csv').option('inferSchema',True).option('header',False).load("file:///home/hadoop/sample1.csv")
df2.show(5)
+---+---+-------------------+
|_c0|_c1|                _c2|
+---+---+-------------------+
|  1|hyd|2013-12-24 23:14:45|
|  2|chn|2018-11-30 11:12:34|
from pyspark.sql.functions import sqrt
from pyspark.sql.types import IntegerType
df.select('Age',sqrt('Age')).show()
>> df2.select('_c0',sqrt('_c0')).show()
+---+------------------+
|_c0|         SQRT(_c0)|
+---+------------------+
|  1|               1.0|
|  2|1.4142135623730951|
#create a UDF
def f2(x):
   return (x*x-x)/2
   print(f2(20))
df3=df2.na.drop()
+---+---+-------------------+
|_c0|_c1|                _c2|
+---+---+-------------------+
|  1|hyd|2013-12-24 23:14:45|
|  2|chn|2018-11-30 11:12:34|
df3=df2.dropna()
+---+---+-------------------+
|_c0|_c1|                _c2|
+---+---+-------------------+
|  1|hyd|2013-12-24 23:14:45|
|  2|chn|2018-11-30 11:12:34|
#Registering the UDF with SparkSQL function
spark.udf.register('myfunc',f2,IntegerType())
myfunc=udf(f2,IntegerType())
df3.select('Age',myfunc('Age')).show()
from pyspark.sql import Row
r1 =Row(age=20,salary=20000,name=None,date_time='2014-12-23')
r2=Row(salary=40000,name='shanker',age=None,date_time=None)
r3=Row(age=30,name='priya',salary=40000,date_time='2015-11-10')
df4=spark.createDataFrame([r1,r2,r3,r4])
df4.printSchema()
+----+----------+-------+------+                                               
| age| date_time|   name|salary|
+----+----------+-------+------+
|  20|2014-12-23|   null| 20000|
|null|      null|shanker| 40000|
|  30|2015-11-10|  priya| 40000|
+----+----------+-------+------+

>>> df4.printSchema()
root
 |-- age: long (nullable = true)
 |-- date_time: string (nullable = true)
 |-- name: string (nullable = true)
 |-- salary: long (nullable = true)
df4.show()
df4.na.fill(-1).na.fill('notprovided').show()
df4.na.fill(-1).fill('notprovided').show()
+---+-----------+-----------+------+
|age|  date_time|       name|salary|
+---+-----------+-----------+------+
| 20| 2014-12-23|notprovided| 20000|
| -1|notprovided|    shanker| 40000|
| 30| 2015-11-10|      priya| 40000|
+---+-----------+-----------+------+
from pyspark.sql.functions import col
>> df5=df4.withColumn('date_time',col("date_time").cast('timestamp'))
>>> df5.printSchema()
root
 |-- age: long (nullable = true)
 |-- date_time: timestamp (nullable = true)
 |-- name: string (nullable = true)
 |-- salary: long (nullable = true)
df5.select('date_time',year('date_time')).show()
------------------+---------------+
|          date_time|year(date_time)|
+-------------------+---------------+
|2014-12-23 00:00:00|           2014|
|               null|           null|
|2015-11-10 00:00:00|           2015|
+-------------------+---------------+
csv file:
1,hyd,2013-12-24 23:14:45
2,chn,2018-11-30 11:12:34
df6=spark.read.format('csv').option('inferSchema',True).load("file:///home/hadoop/sample1.csv"")
df6.printSchema()
from pyspark.sql.functions import max ===> error is thrown on the below statement
from__builtin__ import max  ==> overriding the max function
l1=[12,23,34,45]
print(max[l1])
df.dropDuplicates().show()  ==>using tab

No comments:

Post a Comment

Python Challenges Program

Challenges program: program 1: #Input :ABAABBCA #Output: A4B3C1 str1="ABAABBCA" str2="" d={} for x in str1: d[x]=d...