Friday, March 1, 2019

Pyspark USECASE1

RDD-->Resilient Distributed Dataset(basic/root object)
Dataframes-->

RDD-->unstructured
DF-->stru/semi
unstructured-->RDD operations-->is organised as structured--->Dataframe out of the
RDD

input file:
Spark is processing framework
spark is faster than map reduce
spark and mr are processing framework

df1=spark.read.format('csv').option('delimeter',' ').load('file:///home/hadoop/sample3")

df1.show()

+--------------------+
|               value|
+--------------------+
|Spark is processi...|
|spark is faster t...|
|spark and mr are ...|
+--------------------+

df1=spark.read.format('csv').option('delimeter','##').load('file:///home/hadoop/sampe")

df1.show()-->error is thrown

rd1=spark.sparkContext.textFile("file://home/karna/Desktop/sample")

print(rd1.collect())

[u'Spark#is#processing#framework', u'spark#is#faster#than#map#reduce', u'spark#and#mr#are#processing#framework']

rd2=rd1.map(lamda x:str(x))
print(rd2.map(lambda x:type(x)))

 print(rd1.map(lambda x:str(x)))
PythonRDD[216] at RDD at PythonRDD.scala:53


print(rd2.collect())

['Spark#is#processing#framework', 'spark#is#faster#than#map#reduce', 'spark#and#mr#are#processing#framework']
>>>

rd2.count()
3

rd3=rd2.map(lambda x:x.split('##'))

print(rd3.collect())----> it is returned to the list

To find the length of each string
[['Spark', 'is', 'processing', 'framework'], ['spark', 'is', 'faster', 'than', 'map', 'reduce'], ['spark', 'and', 'mr', 'are', 'processing', 'framework']]

>>> rd3.map(lambda x:len(x)).collect()
[4, 6, 6]

>>> rd4=rd3.map(lambda x:(x[0],x[1],x[2],x[3]))

>>> rd4.collect()
[('Spark', 'is', 'processing', 'framework'), ('spark', 'is', 'faster', 'than'), ('spark', 'and', 'mr', 'are')]
>>>

>> print(rd4.map(lambda x:type(x)).collect())
[<type 'tuple'>, <type 'tuple'>, <type 'tuple'>]


df=rd4.toDF()

df1.show(()

df1=rd4.toDF(["id1","id2","id3","id4","id5"])

df1.show()

+-----+---+----------+---------+
|   _1| _2|        _3|       _4|
+-----+---+----------+---------+
|Spark| is|processing|framework|
|spark| is|    faster|     than|
|spark|and|        mr|      are|
+-----+---+----------+---------+

df.printSchema()
+-----+---+----------+---------+
|   _1| _2|        _3|       _4|
+-----+---+----------+---------+
|Spark| is|processing|framework|
|spark| is|    faster|     than|
|spark|and|        mr|      are|
+-----+---+----------+---------+


df_rsv=spark.read.format('csv').load('file://home")

df_tsv=spark.read.format('csv').option('inferSchema',True).option('delimeter','\t').
load("\home\\")

df_tsv.show(5)

df_tsv.printSchema

from pyspark.sql.types import *

sch=StructType([StructField('id',StringType()),StructField('id2',StringType()).StructField('name',StringType())]).StructField('country',StringType).StructField('year',StringType()).StructField('game',StringType()])



df_tsv=spark.read.format('csv').option('inferSchema',True).option('delimeter','\t').schema(sch).load("\home\\")

create RDD out of Dataframe
df_rdd=df_tsv.rdd===> changing to the ROW object
df_rdd

>>> rd2.collect()
[Row(_1=u'Spark', _2=u'is', _3=u'processing', _4=u'framework'), Row(_1=u'spark', _2=u'is', _3=u'faster', _4=u'than'), Row(_1=u'spark', _2=u'and', _3=u'mr', _4=u'are')]
>>> rd2.take(3)
[Row(_1=u'Spark', _2=u'is', _3=u'processing', _4=u'framework'), Row(_1=u'spark', _2=u'is', _3=u'faster', _4=u'than'), Row(_1=u'spark', _2=u'and', _3=u'mr', _4=u'are')]


df_rdd.map(lambda x:type(x)).take(10)
[<class 'pyspark.sql.types.Row'>, <class 'pyspark.sql.types.Row'>, <class 'pyspark.sql.types.Row'>]



rd4.toDF()

RDD(tuples) rd1.toDF(["c1","c2"})

RDD(ROW)-->df_rowrdd=spark.createDataFrame(df_rdd,sch)

df_rowrdd.show(5)

>>> df5=rd5.toDF()
>>> df5.show()
+-----+---+----------+---------+
|   _1| _2|        _3|       _4|
+-----+---+----------+---------+
|Spark| is|processing|framework|
|spark| is|    faster|     than|
|spark|and|        mr|      are|
+-----+---+----------+---------+

Spark,is,processing,framework
spark is faster than map reduce
spark and mr are processing framework and also computing frameworks

r1=spark.sparkContext.textFile("file:////home/karna/sample")

r2=r1.map(lamda x:(str)x)

r3=r2.map(lamda y:y.split(','))

r3.collect()

r3.map(lamda x:len(x)).collect()

r4=r3.map(lamda x:(x[0],x[1],x[2],x[3],x[4],if x[5]='' return 'nodata',x[6],x[8],x[9]))
r4.collect()

work in notepad
def handlenull(x):
  if x=='':
    return "nodata"
  else:
    return x

handlenull('python')
print(k)

python py1.py1
def handlenull(x):
result=[]
for i in range(10):
  if x[i]=='':
    return result.append("nodata")
  else:
    return result.append(x[i])

handlenull([1,2,3])
print(r)

DAG-Direct Asyclic Graph(flow of execution of RDD)











































No comments:

Post a Comment

Python Challenges Program

Challenges program: program 1: #Input :ABAABBCA #Output: A4B3C1 str1="ABAABBCA" str2="" d={} for x in str1: d[x]=d...