Stages Task Executers
from pyspark import StorageLevelpersistence:
r1=spark.sparkContext.textFile("...")
r2=r1.map(lambda x:x+'python')
r3=r2.filter(lambda x:'X' in x)
r4=r3.flatMap(lambda x:x)
r4.persist()
r4.persist(StorageLevel.DISK_ONLY)
print(r4.take(10))
spark.stop() #stop the spark application
rdd.persist()
with application termination--> entrie data and information will be lost including
persisted data.(irrespective of whether data is persisted to memory or disk)
sparkapp1 sparkapp2
20trf-->result uses result of first application
terminated
r4.colasce(1).saveAsTextFile("c.....")
2nd application
r1=spark.sparkContext.textFile("c/....")
r2=r1.map(lambda x:x)
entire information of spark application execution including
persisted data information will be maintained with "Lineage"
if memory gets crashed...lineage also will be lost...spark
forgets everything
irrespective of lineage crash..is it possibele to load the
intermediate data somewhere to disk..yes, that is with checkpointing
Does persistance depend on lineage? yescheck pointing does not depend on lineage
persistance can be done in memory/disk
checkpointing can only be in disk
spark.sparkContext.setCheckpointDir("C://user/cloudera...")
#setting checkpoint location,prefer is hdfs
r4.checkpoint()
#checkpoint the RDDr5=r4.map(lambda x:x)
r6=r5.map(lambda x:x)
r6.count()
----------------------------------------------------
driver,executor,task,stage
m1 m2 m3 m4
---------------------------------------------------------b1 b2 b3 b4(disk)
| | | |
E1 E2 E3 E4
r1=spark.sparkContext.textFile(' ')
|
| | |
p1 p2 p3 p4(memory)--->r1
r2=r1.t1
| | | |
p1.t1 p2.t1 p3.t1 p4.t1
| | | |
p21 p22 p23 p24
--------->r2
what is the amount of memory and CPU that is needed for
processing a paritions in a machine?
on each node..certain amount of memory and CPU are needed
A logical unit that is created with certain amount of memory
and CPU on a machine is called as Executers(it is logical unit).
An executers executes a partition.
Each executers will take care of one parition at a time
when a spark app is launched...executers will be created by
the spark.
r1=sc.textFile("..")--->paritions are 5
r2=r1.t1-->5 partition--->5 tasksr3=r2.t2--->5 tasks
r4=r3.t3 -->5 tasks
r5=r4.t4
how many executers and partitions?
Executers-->5paritions(DATA)--->5 per RDD
partition-->chunk of data
executers--logical resource /unit for processing the paritions
Task:-->computation/processing of a paritions by an executor
process vs task
RDD computation is a process partition computation is a task
m1 m2 m3 m4
p1,p2 p3,p4 p5,p6 p7,p8
8 partitionsE1 E2 E3 E4
E1,E2 E3,E4 E5,E6 E7,E8
Redistribution (shuffling ) of data:
-------------------------------------narrow and wide
narrow transformation from a pipeline-->no deviation,no
re-distribution/shuffling of data
wide transformations-->data transfers among the
partitions
r1=sc.textFile(";;")--stage 1
r2=r1.nt1 PIPELINE 1r3=r2,nt2
r4=r3.wt3 -----------stage 2
r5=t4.nt4 PIPELINE 2
r6=r5,nt5
r7
=r6,wt6 -------------stage 3
The stages ill be create by a scheduler called DAG scheduler
No comments:
Post a Comment