Monday, March 11, 2019

PYSPARK Regular Expression Operations

PYSPARK Regular Expression Operations

read data from hdfs
data is unstructured text data
we have to clean the data(regular expressions) and make it as structured
write the data into hive with orc format and append the data to the table everyday..
from pyspark.sql.functions import col,current_date,date_add
from pyspark.sql import SparkSession
import json
import re
import sys
if __name__='__main__':
#Reading the data from hdfs
#creation of spark driver
#reading the configuration properties from the JSON
with open(' ') as options:
  sparkoptions=json.load(options)
  master=sparkoptions['master']

spark=SparkSession.builder.config('spark.master','yarn').config('spark.app.name','callapp').\
config('spark.executor.memory','2G').config('spark.executor.cores','2').\
config('spark.submit.deploymode','client').getOrCreate()

data=spark,sparkContext.textFile("c://user//calllogdata")
print(data.take(5))
print(data.first())
data2=data,map(lambda x:x.encode('utf-8'))
print(data2.first())

import re
x=" "
def extract(str):
str=" "
phonenos=re.search('\d{20}',str)
pnos=phonenos.group()
fromno=pnos[0:10]
print(fromno)
tono=pnos[10:20]

print(tono)
status=re.search('[A-Z]{6,7}',str).group()
status=re.search('SUCCESS|DROPPED|FAILED',str).group()
print(status)
timestamps=re.findall('(\d{4}-\d{2}-\d{2}\s{1}\d{2}:\d{2}:\d{2})')
starttime=timestamps[0]
endtime=timestamps[1]
print(starttime)
print(endtimerimg

re.search #search anywhere in the string
re.match  #beginning of string
re.findall #entires all occurances st
return(fromno,tono,status,starttime,endtime)
extract(x)
data3=data2.map(lambda x:extract(x))
print(data3.take(5))

from pyspark.sql.functions import col ,current_date,date_add

#creating dataframe out of this RDD
calldf1=data3.toDF(["fromno","tono","status","starttime","endtime"])
#columns of timestamps should have the format YYYY-MM-dd hh:mm:ss
call df2=calldf1.withColumn('starttime',col('starttime').cast('timestamp')).
withColumn('endtime',col('endtime').cast('timestamp'))

calldf1.show(5)
calldf2.show(5)
calldf1.printSchema()
calldf2.printSchema()

calldf3=calldf2.withColumn('status',col('status').cast('timestamp'))
calldf3.show(5)
it returns null

calldf3=calldf2.withColumn('day',current_date())
calldf3=calldf2.withColumn('day',date_add(current_date(),1))
calldf3.show(5)
calldf3.write.format('orc').mode('append').partitionBy('day').save*"c:\\")
df1=spark.createDataFrame[(3,1),(2,2),['id1','id2'])
df2=spark.createDataFrame[('a',1),('b',2),['id1','id2'])

df1.schema() == df2.schema()
false

No comments:

Post a Comment

Python Challenges Program

Challenges program: program 1: #Input :ABAABBCA #Output: A4B3C1 str1="ABAABBCA" str2="" d={} for x in str1: d[x]=d...