RDD-->Resilient Distributed Dataset(basic/root object)
Dataframes-->
RDD-->unstructured
DF-->stru/semi
unstructured-->RDD operations-->is organised as structured--->Dataframe out of the
RDD
input file:
Spark is processing framework
spark is faster than map reduce
spark and mr are processing framework
df1=spark.read.format('csv').option('delimeter',' ').load('file:///home/hadoop/sample3")
df1.show()
+--------------------+
| value|
+--------------------+
|Spark is processi...|
|spark is faster t...|
|spark and mr are ...|
+--------------------+
df1=spark.read.format('csv').option('delimeter','##').load('file:///home/hadoop/sampe")
df1.show()-->error is thrown
rd1=spark.sparkContext.textFile("file://home/karna/Desktop/sample")
print(rd1.collect())
[u'Spark#is#processing#framework', u'spark#is#faster#than#map#reduce', u'spark#and#mr#are#processing#framework']
rd2=rd1.map(lamda x:str(x))
print(rd2.map(lambda x:type(x)))
print(rd1.map(lambda x:str(x)))
PythonRDD[216] at RDD at PythonRDD.scala:53
print(rd2.collect())
['Spark#is#processing#framework', 'spark#is#faster#than#map#reduce', 'spark#and#mr#are#processing#framework']
>>>
rd2.count()
3
rd3=rd2.map(lambda x:x.split('##'))
print(rd3.collect())----> it is returned to the list
To find the length of each string
[['Spark', 'is', 'processing', 'framework'], ['spark', 'is', 'faster', 'than', 'map', 'reduce'], ['spark', 'and', 'mr', 'are', 'processing', 'framework']]
>>> rd3.map(lambda x:len(x)).collect()
[4, 6, 6]
>>> rd4=rd3.map(lambda x:(x[0],x[1],x[2],x[3]))
>>> rd4.collect()
[('Spark', 'is', 'processing', 'framework'), ('spark', 'is', 'faster', 'than'), ('spark', 'and', 'mr', 'are')]
>>>
>> print(rd4.map(lambda x:type(x)).collect())
[<type 'tuple'>, <type 'tuple'>, <type 'tuple'>]
df=rd4.toDF()
df1.show(()
df1=rd4.toDF(["id1","id2","id3","id4","id5"])
df1.show()
+-----+---+----------+---------+
| _1| _2| _3| _4|
+-----+---+----------+---------+
|Spark| is|processing|framework|
|spark| is| faster| than|
|spark|and| mr| are|
+-----+---+----------+---------+
df.printSchema()
+-----+---+----------+---------+
| _1| _2| _3| _4|
+-----+---+----------+---------+
|Spark| is|processing|framework|
|spark| is| faster| than|
|spark|and| mr| are|
+-----+---+----------+---------+
df_rsv=spark.read.format('csv').load('file://home")
df_tsv=spark.read.format('csv').option('inferSchema',True).option('delimeter','\t').
load("\home\\")
df_tsv.show(5)
df_tsv.printSchema
from pyspark.sql.types import *
sch=StructType([StructField('id',StringType()),StructField('id2',StringType()).StructField('name',StringType())]).StructField('country',StringType).StructField('year',StringType()).StructField('game',StringType()])
df_tsv=spark.read.format('csv').option('inferSchema',True).option('delimeter','\t').schema(sch).load("\home\\")
create RDD out of Dataframe
df_rdd=df_tsv.rdd===> changing to the ROW object
df_rdd
>>> rd2.collect()
[Row(_1=u'Spark', _2=u'is', _3=u'processing', _4=u'framework'), Row(_1=u'spark', _2=u'is', _3=u'faster', _4=u'than'), Row(_1=u'spark', _2=u'and', _3=u'mr', _4=u'are')]
>>> rd2.take(3)
[Row(_1=u'Spark', _2=u'is', _3=u'processing', _4=u'framework'), Row(_1=u'spark', _2=u'is', _3=u'faster', _4=u'than'), Row(_1=u'spark', _2=u'and', _3=u'mr', _4=u'are')]
df_rdd.map(lambda x:type(x)).take(10)
[<class 'pyspark.sql.types.Row'>, <class 'pyspark.sql.types.Row'>, <class 'pyspark.sql.types.Row'>]
rd4.toDF()
RDD(tuples) rd1.toDF(["c1","c2"})
RDD(ROW)-->df_rowrdd=spark.createDataFrame(df_rdd,sch)
df_rowrdd.show(5)
>>> df5=rd5.toDF()
>>> df5.show()
+-----+---+----------+---------+
| _1| _2| _3| _4|
+-----+---+----------+---------+
|Spark| is|processing|framework|
|spark| is| faster| than|
|spark|and| mr| are|
+-----+---+----------+---------+
Spark,is,processing,framework
spark is faster than map reduce
spark and mr are processing framework and also computing frameworks
r1=spark.sparkContext.textFile("file:////home/karna/sample")
r2=r1.map(lamda x:(str)x)
r3=r2.map(lamda y:y.split(','))
r3.collect()
r3.map(lamda x:len(x)).collect()
r4=r3.map(lamda x:(x[0],x[1],x[2],x[3],x[4],if x[5]='' return 'nodata',x[6],x[8],x[9]))
r4.collect()
work in notepad
def handlenull(x):
if x=='':
return "nodata"
else:
return x
handlenull('python')
print(k)
python py1.py1
def handlenull(x):
result=[]
for i in range(10):
if x[i]=='':
return result.append("nodata")
else:
return result.append(x[i])
handlenull([1,2,3])
print(r)
DAG-Direct Asyclic Graph(flow of execution of RDD)
Dataframes-->
RDD-->unstructured
DF-->stru/semi
unstructured-->RDD operations-->is organised as structured--->Dataframe out of the
RDD
input file:
Spark is processing framework
spark is faster than map reduce
spark and mr are processing framework
df1=spark.read.format('csv').option('delimeter',' ').load('file:///home/hadoop/sample3")
df1.show()
+--------------------+
| value|
+--------------------+
|Spark is processi...|
|spark is faster t...|
|spark and mr are ...|
+--------------------+
df1=spark.read.format('csv').option('delimeter','##').load('file:///home/hadoop/sampe")
df1.show()-->error is thrown
rd1=spark.sparkContext.textFile("file://home/karna/Desktop/sample")
print(rd1.collect())
[u'Spark#is#processing#framework', u'spark#is#faster#than#map#reduce', u'spark#and#mr#are#processing#framework']
rd2=rd1.map(lamda x:str(x))
print(rd2.map(lambda x:type(x)))
print(rd1.map(lambda x:str(x)))
PythonRDD[216] at RDD at PythonRDD.scala:53
print(rd2.collect())
['Spark#is#processing#framework', 'spark#is#faster#than#map#reduce', 'spark#and#mr#are#processing#framework']
>>>
rd2.count()
3
rd3=rd2.map(lambda x:x.split('##'))
print(rd3.collect())----> it is returned to the list
To find the length of each string
[['Spark', 'is', 'processing', 'framework'], ['spark', 'is', 'faster', 'than', 'map', 'reduce'], ['spark', 'and', 'mr', 'are', 'processing', 'framework']]
>>> rd3.map(lambda x:len(x)).collect()
[4, 6, 6]
>>> rd4=rd3.map(lambda x:(x[0],x[1],x[2],x[3]))
>>> rd4.collect()
[('Spark', 'is', 'processing', 'framework'), ('spark', 'is', 'faster', 'than'), ('spark', 'and', 'mr', 'are')]
>>>
>> print(rd4.map(lambda x:type(x)).collect())
[<type 'tuple'>, <type 'tuple'>, <type 'tuple'>]
df=rd4.toDF()
df1.show(()
df1=rd4.toDF(["id1","id2","id3","id4","id5"])
df1.show()
+-----+---+----------+---------+
| _1| _2| _3| _4|
+-----+---+----------+---------+
|Spark| is|processing|framework|
|spark| is| faster| than|
|spark|and| mr| are|
+-----+---+----------+---------+
df.printSchema()
+-----+---+----------+---------+
| _1| _2| _3| _4|
+-----+---+----------+---------+
|Spark| is|processing|framework|
|spark| is| faster| than|
|spark|and| mr| are|
+-----+---+----------+---------+
df_rsv=spark.read.format('csv').load('file://home")
df_tsv=spark.read.format('csv').option('inferSchema',True).option('delimeter','\t').
load("\home\\")
df_tsv.show(5)
df_tsv.printSchema
from pyspark.sql.types import *
sch=StructType([StructField('id',StringType()),StructField('id2',StringType()).StructField('name',StringType())]).StructField('country',StringType).StructField('year',StringType()).StructField('game',StringType()])
df_tsv=spark.read.format('csv').option('inferSchema',True).option('delimeter','\t').schema(sch).load("\home\\")
create RDD out of Dataframe
df_rdd=df_tsv.rdd===> changing to the ROW object
df_rdd
>>> rd2.collect()
[Row(_1=u'Spark', _2=u'is', _3=u'processing', _4=u'framework'), Row(_1=u'spark', _2=u'is', _3=u'faster', _4=u'than'), Row(_1=u'spark', _2=u'and', _3=u'mr', _4=u'are')]
>>> rd2.take(3)
[Row(_1=u'Spark', _2=u'is', _3=u'processing', _4=u'framework'), Row(_1=u'spark', _2=u'is', _3=u'faster', _4=u'than'), Row(_1=u'spark', _2=u'and', _3=u'mr', _4=u'are')]
df_rdd.map(lambda x:type(x)).take(10)
[<class 'pyspark.sql.types.Row'>, <class 'pyspark.sql.types.Row'>, <class 'pyspark.sql.types.Row'>]
rd4.toDF()
RDD(tuples) rd1.toDF(["c1","c2"})
RDD(ROW)-->df_rowrdd=spark.createDataFrame(df_rdd,sch)
df_rowrdd.show(5)
>>> df5=rd5.toDF()
>>> df5.show()
+-----+---+----------+---------+
| _1| _2| _3| _4|
+-----+---+----------+---------+
|Spark| is|processing|framework|
|spark| is| faster| than|
|spark|and| mr| are|
+-----+---+----------+---------+
Spark,is,processing,framework
spark is faster than map reduce
spark and mr are processing framework and also computing frameworks
r1=spark.sparkContext.textFile("file:////home/karna/sample")
r2=r1.map(lamda x:(str)x)
r3=r2.map(lamda y:y.split(','))
r3.collect()
r3.map(lamda x:len(x)).collect()
r4=r3.map(lamda x:(x[0],x[1],x[2],x[3],x[4],if x[5]='' return 'nodata',x[6],x[8],x[9]))
r4.collect()
work in notepad
def handlenull(x):
if x=='':
return "nodata"
else:
return x
handlenull('python')
print(k)
python py1.py1
def handlenull(x):
result=[]
for i in range(10):
if x[i]=='':
return result.append("nodata")
else:
return result.append(x[i])
handlenull([1,2,3])
print(r)
DAG-Direct Asyclic Graph(flow of execution of RDD)
No comments:
Post a Comment