Wordcount Example using pyspark
spark
In spark UI,actions as jobs
l1=[23,34,45,67]
r1=spark.sparkContext.parallelize(l1) #create a RDD from a collection)
r1.collect()
t1=('hyd','chn','del')
r2=spark.sparkContext.parallelize(t1)
r2.collect()
r3=spark.sparkContext.range(10)
r3.colect()
RDD/Dataframes-->files/tables/collections
Parallelize->create RDD out of local collection
spark is a processing framework
spark is fatster than mapreduce
hadoop and spark work together
rd1=spark.sparkContext.textFile("file:///home/hadoop/sample.txt")
rd1.collect()
[u'spark is a processing framework', u'spark is fatster than mapreduce', u'hadoop and spark work together', u'']
rd1.count()
4
rd2=rd1.map(lambda x:x.encode('utf-8'))
print(rd2.collect())
['spark is a processing framework', 'spark is fatster than mapreduce', 'hadoop and spark work together', '']
rd3=rd2.map(lambda x:x.split(' '))
rd3.collect()
[['spark', 'is', 'a', 'processing', 'framework'], ['spark', 'is', 'fatster', 'than', 'mapreduce'], ['hadoop', 'and', 'spark', 'work', 'together'], ['']]
rd3=rd2.flatMap(lambda x:x.split(' ')).map(lambda x:(x,1)).reduceByKey(lambda x,y:x+y)
rd3.collect()
[('a', 1), ('and', 1), ('fatster', 1), ('processing', 1), ('', 1), ('is', 2), ('work', 1), ('mapreduce', 1), ('together', 1), ('hadoop', 1), ('framework', 1), ('spark', 3), ('than', 1)]
rd3.saveAsTextFile("file:///home/hadoop/sample_wordcount")
('fatster', 1)
('processing', 1)
('', 1)
('is', 2)
('work', 1)
('mapreduce', 1)
('together', 1)
('hadoop', 1)
('framework', 1)
('spark', 3)
('than', 1)
flatMap(function)--> apply the function to each element of RDD and it flattens the resultant
RAM->
Cache->it is very fast than others
lazy evaluation
------------------
transformations are lazily evaluated-->they will be passive as long as there is no action involving them
spark by default caches the data that is used mostly in operations
persistance->we can make spark to presever large amounts of data in RAM or Disk
spark will make use of memory and CPU as long as the application(operations)
running once operations are done.nothing will be there in memory and disk.
local[n]-->n cores will be allocated
persistance--we can ask spark to preserve some data in memory even after the job is done.
from pyspark.sql import SparkSession
spark=SparkSession.builder.master('local').appName('app').getOrCreate()spark
To find the spark URI
#spark.sparkContext.uiWebUrl
'http://10.0.2.15:4040'
Jobs:
In spark operations:
transformations and actions-->set of
transformations-->Result(Printed/Saved somewhere)In spark UI,actions as jobs
l1=[23,34,45,67]
r1=spark.sparkContext.parallelize(l1) #create a RDD from a collection)
r1.collect()
t1=('hyd','chn','del')
r2=spark.sparkContext.parallelize(t1)
r2.collect()
r3=spark.sparkContext.range(10)
r3.colect()
RDD/Dataframes-->files/tables/collections
Parallelize->create RDD out of local collection
spark is a processing framework
spark is fatster than mapreduce
hadoop and spark work together
rd1=spark.sparkContext.textFile("file:///home/hadoop/sample.txt")
rd1.collect()
[u'spark is a processing framework', u'spark is fatster than mapreduce', u'hadoop and spark work together', u'']
rd1.count()
4
rd2=rd1.map(lambda x:x.encode('utf-8'))
print(rd2.collect())
['spark is a processing framework', 'spark is fatster than mapreduce', 'hadoop and spark work together', '']
rd3=rd2.map(lambda x:x.split(' '))
rd3.collect()
[['spark', 'is', 'a', 'processing', 'framework'], ['spark', 'is', 'fatster', 'than', 'mapreduce'], ['hadoop', 'and', 'spark', 'work', 'together'], ['']]
rd3=rd2.flatMap(lambda x:x.split(' ')).map(lambda x:(x,1)).reduceByKey(lambda x,y:x+y)
rd3.collect()
[('a', 1), ('and', 1), ('fatster', 1), ('processing', 1), ('', 1), ('is', 2), ('work', 1), ('mapreduce', 1), ('together', 1), ('hadoop', 1), ('framework', 1), ('spark', 3), ('than', 1)]
rd3.saveAsTextFile("file:///home/hadoop/sample_wordcount")
('a', 1)
('and', 1)('fatster', 1)
('processing', 1)
('', 1)
('is', 2)
('work', 1)
('mapreduce', 1)
('together', 1)
('hadoop', 1)
('framework', 1)
('spark', 3)
('than', 1)
reducebykey-(aggregation)->grouping and aggregating
map(function)-->apply the functions to each element of
RDDflatMap(function)--> apply the function to each element of RDD and it flattens the resultant
x1=rd2.map(lambda x:x.split(' '))
print(x1.collect()
x2=rd2.flatMap(lambda x:x.split(' '))
print(x2.collect()
each action-->spark job-->DAG,submission time,details
Disk->RAM->
Cache->it is very fast than others
lazy evaluation
------------------
transformations are lazily evaluated-->they will be passive as long as there is no action involving them
spark by default caches the data that is used mostly in operations
persistance->we can make spark to presever large amounts of data in RAM or Disk
spark will make use of memory and CPU as long as the application(operations)
running once operations are done.nothing will be there in memory and disk.
RAM and CPU will be freed.
by default-->RAM-->1GB
cores(CPU)-->local/local[1]->1 core will be added
local[3]-->3 cores will be allocated
local[n]-->n cores will be allocated
persistance--we can ask spark to preserve some data in memory even after the job is done.
No comments:
Post a Comment