Thursday, February 28, 2019

PYSPARK PYCHARM PROJECT

Points:


  LFS is not distributed

  HDFS,HIVE,CASSANDRA are distributed well

  spark driver creation:

  sparkcontext----------->RDD

  SQLContext,HiveContext==>Dataframes

  sparksession------>Dataframe,RDD

dataframes==>csv,xml.json,tables(semi structured/structured)


rdd-->unstructured(cleaning)
 

Step1 : Install pycharm community edition

step2 : File-->Setting-->python interpreter-->choose the python 2.7 version

step 3:create a SQLPackage -->then create python program -->Demo.pycharm

Step 4: Run the below code


Input File:

[{
"Year": "2013",
"First Name": "JANE",
"County": "A",
"Sex": "F",
"Count": "27"
}, {
"Year": "2013",
"First Name": "JADE",
"County": "B",
"Sex": "M",
"Count": "26"
}, {
"Year": "2013",
"First Name": "JAMES",
"County": "C",
"Sex": "M",
"Count": "21"
}]


Driver program:

from pyspark.sql import SparkSession
def createdriver():
 spark=SparkSession.builder.master('local').appName('demoapp').getOrCreate()
 return spark



Main program:

from Demo import createdriver
import logging

#logging configuration
#logging levels are INFO,DEBUG,ERROR,WARN,FATAL
logging.basicConfig(filename='c:\\Users\\Nethra\\Desktop\\spark1.log',level=logging.INFO)

if__name__='__main__':
sparkdriver=createdriver()
logging.info('Spark driver created successfully')

#reading the input parameter
logging.info('Reading input parameters')
file_format=raw_input("Enter the file format\n")
input_file=raw_input("Enter the input file name\n")
logging.info('input parameters read succssfully')

# reading the data
df=sparkdriver.read.format(file_format).option('multiLine',True).load("c:\\Users\\Nethra\\Desktop\mul.txt")
logging.info('reading done successfully')

# processing the data
df.show(10)
df.write.format('csv').mode('overwrite').save('c:\\Users\\Nethra\\Desktop\\dfout')

#Read from CSV again
df1=sparkdriver.read.format("csv").load('c:\\Users\\Nethra\\Desktop\\spark1.log')
df1.write.saveAsTable('dfTable')

# stop the application/job
logging.info('stopping the application')
sparkdriver.stop()



Example 2: SparkContext program

from pyspark import SparkContext
from pyspark.sql.types import *

sc=SparkContext(master='local',appName='demoapp')
rd1=sc.textFile("c:\\Users\\Nethra\\Desktop\\spark1.log")
rd2=rd1.map(lambda x: str(x))
print(rd1.collect())
print(type(rd2))

Example 3: Spark Session Program

from pyspark.sql.types import *
from Demo import createdriver

spark=createdriver()

rd1=spark.sparkContext.textFile("c:\\Users\\Nethra\\Desktop\\spark1.log")

print(rd1)

print(rd1.collect())


Example 3: In windows, spark hive integeration

Driver program:


 def createSparkDriver():
  spark=SparkSession.builder.master('local').appName('demoapp').
  config('hive.metastore.uris','thrift://localhost:9083');\
  config('spark.sql.warehouse.dir','hdfs://localhost:8020/user/hive/warehouse').\
  enableHiveSupport().\
  getOrCreate()
  return spark

 
  from pyspark.sql import SQLContext
  import os

  os.environ['HADOOP_HOME']=='C:\hadoop'

  sc=SparkContext(master='local',appName='demoapp',conf='spark.jars.packages='org.apache.spark:spark_hive_2.11:2.4.0"')
  sqlc=SQLContext(sc)
 
  df2=sqlc.read.format('json').option('multiLine',True).load("c:\\")

  print(df2)

  df2.show(5)





















No comments:

Post a Comment

Python Challenges Program

Challenges program: program 1: #Input :ABAABBCA #Output: A4B3C1 str1="ABAABBCA" str2="" d={} for x in str1: d[x]=d...