PYSPARK Regular Expression Operations
#Reading the data from hdfs
#creation of spark driver
#reading the configuration properties from the JSON
with open(' ') as options:
sparkoptions=json.load(options)
master=sparkoptions['master']
config('spark.submit.deploymode','client').getOrCreate()
print(data.first())
data2=data,map(lambda x:x.encode('utf-8'))
print(data2.first())
import re
x=" "
def extract(str):
str=" "
phonenos=re.search('\d{20}',str)
pnos=phonenos.group()
fromno=pnos[0:10]
print(fromno)
tono=pnos[10:20]
status=re.search('SUCCESS|DROPPED|FAILED',str).group()
print(status)
timestamps=re.findall('(\d{4}-\d{2}-\d{2}\s{1}\d{2}:\d{2}:\d{2})')
starttime=timestamps[0]
endtime=timestamps[1]
print(starttime)
print(endtimerimg
return(fromno,tono,status,starttime,endtime)
extract(x)
data3=data2.map(lambda x:extract(x))
print(data3.take(5))
from pyspark.sql.functions import col ,current_date,date_add
#columns of timestamps should have the format YYYY-MM-dd hh:mm:ss
call df2=calldf1.withColumn('starttime',col('starttime').cast('timestamp')).
withColumn('endtime',col('endtime').cast('timestamp'))
calldf2.printSchema()
calldf3=calldf2.withColumn('status',col('status').cast('timestamp'))
calldf3.show(5)
it returns null
calldf3.show(5)
calldf3.write.format('orc').mode('append').partitionBy('day').save*"c:\\")
df1=spark.createDataFrame[(3,1),(2,2),['id1','id2'])
df2=spark.createDataFrame[('a',1),('b',2),['id1','id2'])
read data from hdfs
data is unstructured text data
we have to clean the data(regular expressions) and make it
as structured
write the data into hive with orc format and append the data
to the table everyday..
from pyspark.sql.functions import col,current_date,date_add
from pyspark.sql import SparkSession
import json
import re
import sys
if __name__='__main__':#Reading the data from hdfs
#creation of spark driver
#reading the configuration properties from the JSON
with open(' ') as options:
sparkoptions=json.load(options)
master=sparkoptions['master']
spark=SparkSession.builder.config('spark.master','yarn').config('spark.app.name','callapp').\
config('spark.executor.memory','2G').config('spark.executor.cores','2').\config('spark.submit.deploymode','client').getOrCreate()
data=spark,sparkContext.textFile("c://user//calllogdata")
print(data.take(5))print(data.first())
data2=data,map(lambda x:x.encode('utf-8'))
print(data2.first())
import re
x=" "
def extract(str):
str=" "
phonenos=re.search('\d{20}',str)
pnos=phonenos.group()
fromno=pnos[0:10]
print(fromno)
tono=pnos[10:20]
print(tono)
status=re.search('[A-Z]{6,7}',str).group()status=re.search('SUCCESS|DROPPED|FAILED',str).group()
print(status)
timestamps=re.findall('(\d{4}-\d{2}-\d{2}\s{1}\d{2}:\d{2}:\d{2})')
starttime=timestamps[0]
endtime=timestamps[1]
print(starttime)
print(endtimerimg
re.search #search anywhere in the string
re.match #beginning of string
re.findall #entires all occurances stre.match #beginning of string
return(fromno,tono,status,starttime,endtime)
extract(x)
data3=data2.map(lambda x:extract(x))
print(data3.take(5))
from pyspark.sql.functions import col ,current_date,date_add
#creating dataframe out of this RDD
calldf1=data3.toDF(["fromno","tono","status","starttime","endtime"])#columns of timestamps should have the format YYYY-MM-dd hh:mm:ss
call df2=calldf1.withColumn('starttime',col('starttime').cast('timestamp')).
withColumn('endtime',col('endtime').cast('timestamp'))
calldf1.show(5)
calldf2.show(5)
calldf1.printSchema()calldf2.printSchema()
calldf3=calldf2.withColumn('status',col('status').cast('timestamp'))
calldf3.show(5)
it returns null
calldf3=calldf2.withColumn('day',current_date())
calldf3=calldf2.withColumn('day',date_add(current_date(),1))calldf3.show(5)
calldf3.write.format('orc').mode('append').partitionBy('day').save*"c:\\")
df1=spark.createDataFrame[(3,1),(2,2),['id1','id2'])
df2=spark.createDataFrame[('a',1),('b',2),['id1','id2'])
df1.schema() == df2.schema()
false
No comments:
Post a Comment