Friday, March 1, 2019

Pyspark Hive Integeration

Hive and Spark Integeration:
hive-->warehouse
hive-->data(warehouse)
     -->Metastore(RDBMS)

Hive--->HQL/SQL --> which is used to play the data against HDFS.

metastore--> a dtabase in RDBMS
---------------------------------------------------------------
Oracle-->SQL

data-->files,NTFS,ext4

transactional

metadata-->RDBMS

hive>show tables;

show databases;

show tables in default;

select * from default.accountdevice limit 5
pyspark


spark and hive
----------------
From hadoop conff(hadoophome/etc/hadoop)..core-site.xml and hdfs-site.xml
Hive conf.(hivehome/conf).hive-site.xml

sparkhome/conf->core-site.xml,hdfs-site.xml and hive-site.xml

spark.sql('show databases').count()

spark.sql('select * from default.accountservice limit 10').show(10)

df_hive1=spark.read.table('default.accountdevice')

park.sql('insert into retail.employee values(103,"Sriram")')
DataFrame[]

>>> spark.sql('select * from retail.employee')
DataFrame[id: int, name: string]

>>> spark.sql('select * from retail.employee').show()
+---+-------+
| id|   name|
+---+-------+
|102| RAMESH|
|103| SRiram|
|101|shankar|
|103| Sriram|
+---+-------+

>>> spark.read.table('retail.employee')
DataFrame[id: int, name: string]

>>> spark.read.table('retail.employee').show()
+---+-------+
| id|   name|
+---+-------+
|102| RAMESH|
|103| SRiram|
|101|shankar|
|103| Sriram|
+---+-------+

df_hive1.printSchema()
root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)

df_hive1.show(5)
---+-------+
| id|   name|
+---+-------+
|102| RAMESH|
|103| SRiram|
|101|shankar|
|103| Sriram|
+---+-------+
df_hive2=spark.sql('select * from default.accountdevice')

df_hive2.count()

df_hive1.count()

DSL -->domain specific language

df_hive1=spark.read.table('default.accountdevice').filter('device_id=29')

df_hive1.show(5)

df3=df_hive1.withColumn('new',current_timestamp())

df3=df_hive1.withColumn('new',current_date())
+---+-------+------------+
| id|   name|Current_Date|
+---+-------+------------+
|102| RAMESH|  2019-03-02|
|103| SRiram|  2019-03-02|
|101|shankar|  2019-03-02|
|103| Sriram|  2019-03-02|
+---+-------+------------+

from pyspark.sql.functions import *

df3.show(5)

df3.write.saveAsTable('default.testspark')

show tables in default;

select * from testsparkdf limit 10;

spark.sql("select * from testsparkdf2")
DataFrame[id: int, name: string, Current_Date: date]

>>> spark.sql("select * from testsparkdf2").show()
+---+-------+------------+
| id|   name|Current_Date|
+---+-------+------------+
|101|shankar|  2019-03-02|
|102| RAMESH|  2019-03-02|
|103| SRiram|  2019-03-02|
|103| Sriram|  2019-03-02|
+---+-------+------------+


spark is default as parquet format

df3.write.format('csv').saveAsTable('default.testsparkdf2')

df4=spark.read.format('csv'),load("hdfs://172.16.38.131:8020/user/hive/warehouse/
testspark2/,...../csv")

hadoop@hadoop:~$ hadoop fs -ls /user/hive/warehouse/testsparkdf3
Found 5 items
-rw-r--r--   3 hadoop supergroup          0 2019-03-02 04:14 /user/hive/warehouse/testsparkdf3/_SUCCESS
-rw-r--r--   3 hadoop supergroup         22 2019-03-02 04:14 /user/hive/warehouse/testsparkdf3/part-00000-f2e9991d-fbaa-40a7-b2c7-730d61b2898c-c000.csv
-rw-r--r--   3 hadoop supergroup         22 2019-03-02 04:14 /user/hive/warehouse/testsparkdf3/part-00001-f2e9991d-fbaa-40a7-b2c7-730d61b2898c-c000.csv
-rw-r--r--   3 hadoop supergroup         23 2019-03-02 04:14 /user/hive/warehouse/testsparkdf3/part-00002-f2e9991d-fbaa-40a7-b2c7-730d61b2898c-c000.csv
-rw-r--r--   3 hadoop supergroup         22 2019-03-02 04:14 /user/hive/warehouse/testsparkdf3/part-00003-f2e9991d-fbaa-40a7-b2c7-730d61b2898c-c000.csv
hadoop@hadoop:~$

df4.show

df4=spark.read.load("hdfs://172.16.38.131:8020/user/hive/warehouse/
testspark2/,...../csv")

df3.show()-->error is thrown

>>> df4=spark.read.format("csv").load("hdfs://127.0.0.1:9000/user/hive/warehouse/testsparkdf3/")

>>> df4.show()

+---+-------+----------+
|_c0|    _c1|       _c2|
+---+-------+----------+
|101|shankar|2019-03-02|
|102| RAMESH|2019-03-02|
|103| SRiram|2019-03-02|
|103| Sriram|2019-03-02|
+---+-------+----------+

HDFS-->hive(HQL)(MR Execution Engine)
spark--->(spark engine)
with hive-->spark integeration -->we are giving spark the access to HiveData as
well as Schema

spark can read and write from HDFS--(No tabular fashion)

dfs=spark.read.format('csv').option('delimeter','\t').load('file://home...tsv')
df5.show(4)

df5.write.saveAsTable('default.pysparkdf')
External table-->LOCATION

mysql
select * from TBLS;

thrift is a network protocal, which is used to establish the connection to the server.

























 
 

No comments:

Post a Comment

Python Challenges Program

Challenges program: program 1: #Input :ABAABBCA #Output: A4B3C1 str1="ABAABBCA" str2="" d={} for x in str1: d[x]=d...