Friday, March 8, 2019

Pyspark Stages Tasks Executors


Stages Task Executers
from pyspark import StorageLevel
persistence:
r1=spark.sparkContext.textFile("...")
r2=r1.map(lambda x:x+'python')
r3=r2.filter(lambda x:'X' in x)
r4=r3.flatMap(lambda x:x)
r4.persist()
r4.persist(StorageLevel.DISK_ONLY)
print(r4.take(10))
spark.stop()    #stop the spark application
rdd.persist()

with application termination--> entrie data and information  will be lost including
persisted data.(irrespective of whether data is persisted to memory or disk)

sparkapp1          sparkapp2
20trf-->result     uses result of first application
terminated

r4.colasce(1).saveAsTextFile("c.....")
2nd application
r1=spark.sparkContext.textFile("c/....")
r2=r1.map(lambda x:x)

entire information of spark application execution including persisted data information will be maintained with "Lineage"
if memory gets crashed...lineage also will be lost...spark forgets everything

irrespective of lineage crash..is it possibele to load the intermediate data somewhere to disk..yes, that is with checkpointing
Does persistance depend on lineage? yes
check pointing does not depend on lineage

persistance can be done in memory/disk
checkpointing can only be in disk

spark.sparkContext.setCheckpointDir("C://user/cloudera...") #setting checkpoint location,prefer is hdfs
r4.checkpoint()  #checkpoint the RDD
r5=r4.map(lambda x:x)
r6=r5.map(lambda x:x)
r6.count()
----------------------------------------------------
driver,executor,task,stage

m1      m2       m3           m4
---------------------------------------------------------
b1      b2       b3           b4(disk)

|        |       |             |

E1       E2      E3            E4   r1=spark.sparkContext.textFile(' ')

|                            |                           |                                                         |

p1      p2       p3            p4(memory)--->r1
                                                                          r2=r1.t1

|       |         |             |

p1.t1  p2.t1     p3.t1         p4.t1

|      |          |             |

p21    p22        p23           p24  --------->r2

what is the amount of memory and CPU that is needed for processing a paritions in a machine?
on each node..certain amount of memory and CPU are needed

A logical unit that is created with certain amount of memory and CPU on a machine is called as Executers(it is logical unit).
An executers executes a partition.

Each executers will take care of one parition at a time
when a spark app is launched...executers will be created by the spark.

r1=sc.textFile("..")--->paritions are 5
r2=r1.t1-->5 partition--->5 tasks
r3=r2.t2--->5 tasks
r4=r3.t3 -->5 tasks
r5=r4.t4

how many executers and partitions?
Executers-->5
paritions(DATA)--->5 per RDD
partition-->chunk of data
executers--logical resource /unit for processing the paritions
Task:-->computation/processing of a paritions by an executor

process vs task
RDD computation is a process
partition computation is a task

m1      m2         m3       m4
p1,p2 p3,p4      p5,p6     p7,p8  8 partitions
E1      E2         E3       E4
E1,E2   E3,E4      E5,E6    E7,E8

Redistribution (shuffling ) of data:
-------------------------------------
narrow and wide

narrow transformation from a pipeline-->no deviation,no re-distribution/shuffling of data
wide transformations-->data transfers among the partitions

r1=sc.textFile(";;")--stage 1
r2=r1.nt1   PIPELINE 1
r3=r2,nt2
r4=r3.wt3   -----------stage 2
r5=t4.nt4   PIPELINE 2
r6=r5,nt5
r7
=r6,wt6  -------------stage 3

The stages ill be create by a scheduler called DAG scheduler
                                                                    

                                                                          














No comments:

Post a Comment

Python Challenges Program

Challenges program: program 1: #Input :ABAABBCA #Output: A4B3C1 str1="ABAABBCA" str2="" d={} for x in str1: d[x]=d...