Hive and Spark Integeration:
hive-->warehouse
hive-->data(warehouse)
-->Metastore(RDBMS)
Hive--->HQL/SQL --> which is used to play the data against HDFS.
metastore--> a dtabase in RDBMS
---------------------------------------------------------------
Oracle-->SQL
data-->files,NTFS,ext4
transactional
metadata-->RDBMS
hive>show tables;
show databases;
show tables in default;
select * from default.accountdevice limit 5
pyspark
spark and hive
----------------
From hadoop conff(hadoophome/etc/hadoop)..core-site.xml and hdfs-site.xml
Hive conf.(hivehome/conf).hive-site.xml
sparkhome/conf->core-site.xml,hdfs-site.xml and hive-site.xml
spark.sql('show databases').count()
spark.sql('select * from default.accountservice limit 10').show(10)
df_hive1=spark.read.table('default.accountdevice')
park.sql('insert into retail.employee values(103,"Sriram")')
DataFrame[]
>>> spark.sql('select * from retail.employee')
DataFrame[id: int, name: string]
>>> spark.sql('select * from retail.employee').show()
+---+-------+
| id| name|
+---+-------+
|102| RAMESH|
|103| SRiram|
|101|shankar|
|103| Sriram|
+---+-------+
>>> spark.read.table('retail.employee')
DataFrame[id: int, name: string]
>>> spark.read.table('retail.employee').show()
+---+-------+
| id| name|
+---+-------+
|102| RAMESH|
|103| SRiram|
|101|shankar|
|103| Sriram|
+---+-------+
df_hive1.printSchema()
root
|-- id: integer (nullable = true)
|-- name: string (nullable = true)
df_hive1.show(5)
---+-------+
| id| name|
+---+-------+
|102| RAMESH|
|103| SRiram|
|101|shankar|
|103| Sriram|
+---+-------+
df_hive2=spark.sql('select * from default.accountdevice')
df_hive2.count()
df_hive1.count()
DSL -->domain specific language
df_hive1=spark.read.table('default.accountdevice').filter('device_id=29')
df_hive1.show(5)
df3=df_hive1.withColumn('new',current_timestamp())
df3=df_hive1.withColumn('new',current_date())
+---+-------+------------+
| id| name|Current_Date|
+---+-------+------------+
|102| RAMESH| 2019-03-02|
|103| SRiram| 2019-03-02|
|101|shankar| 2019-03-02|
|103| Sriram| 2019-03-02|
+---+-------+------------+
from pyspark.sql.functions import *
df3.show(5)
df3.write.saveAsTable('default.testspark')
show tables in default;
select * from testsparkdf limit 10;
spark.sql("select * from testsparkdf2")
DataFrame[id: int, name: string, Current_Date: date]
>>> spark.sql("select * from testsparkdf2").show()
+---+-------+------------+
| id| name|Current_Date|
+---+-------+------------+
|101|shankar| 2019-03-02|
|102| RAMESH| 2019-03-02|
|103| SRiram| 2019-03-02|
|103| Sriram| 2019-03-02|
+---+-------+------------+
spark is default as parquet format
df3.write.format('csv').saveAsTable('default.testsparkdf2')
df4=spark.read.format('csv'),load("hdfs://172.16.38.131:8020/user/hive/warehouse/
testspark2/,...../csv")
hadoop@hadoop:~$ hadoop fs -ls /user/hive/warehouse/testsparkdf3
Found 5 items
-rw-r--r-- 3 hadoop supergroup 0 2019-03-02 04:14 /user/hive/warehouse/testsparkdf3/_SUCCESS
-rw-r--r-- 3 hadoop supergroup 22 2019-03-02 04:14 /user/hive/warehouse/testsparkdf3/part-00000-f2e9991d-fbaa-40a7-b2c7-730d61b2898c-c000.csv
-rw-r--r-- 3 hadoop supergroup 22 2019-03-02 04:14 /user/hive/warehouse/testsparkdf3/part-00001-f2e9991d-fbaa-40a7-b2c7-730d61b2898c-c000.csv
-rw-r--r-- 3 hadoop supergroup 23 2019-03-02 04:14 /user/hive/warehouse/testsparkdf3/part-00002-f2e9991d-fbaa-40a7-b2c7-730d61b2898c-c000.csv
-rw-r--r-- 3 hadoop supergroup 22 2019-03-02 04:14 /user/hive/warehouse/testsparkdf3/part-00003-f2e9991d-fbaa-40a7-b2c7-730d61b2898c-c000.csv
hadoop@hadoop:~$
df4.show
df4=spark.read.load("hdfs://172.16.38.131:8020/user/hive/warehouse/
testspark2/,...../csv")
df3.show()-->error is thrown
>>> df4=spark.read.format("csv").load("hdfs://127.0.0.1:9000/user/hive/warehouse/testsparkdf3/")
>>> df4.show()
+---+-------+----------+
|_c0| _c1| _c2|
+---+-------+----------+
|101|shankar|2019-03-02|
|102| RAMESH|2019-03-02|
|103| SRiram|2019-03-02|
|103| Sriram|2019-03-02|
+---+-------+----------+
HDFS-->hive(HQL)(MR Execution Engine)
spark--->(spark engine)
with hive-->spark integeration -->we are giving spark the access to HiveData as
well as Schema
spark can read and write from HDFS--(No tabular fashion)
dfs=spark.read.format('csv').option('delimeter','\t').load('file://home...tsv')
df5.show(4)
df5.write.saveAsTable('default.pysparkdf')
External table-->LOCATION
mysql
select * from TBLS;
thrift is a network protocal, which is used to establish the connection to the server.
hive-->warehouse
hive-->data(warehouse)
-->Metastore(RDBMS)
Hive--->HQL/SQL --> which is used to play the data against HDFS.
metastore--> a dtabase in RDBMS
---------------------------------------------------------------
Oracle-->SQL
data-->files,NTFS,ext4
transactional
metadata-->RDBMS
hive>show tables;
show databases;
show tables in default;
select * from default.accountdevice limit 5
pyspark
spark and hive
----------------
From hadoop conff(hadoophome/etc/hadoop)..core-site.xml and hdfs-site.xml
Hive conf.(hivehome/conf).hive-site.xml
sparkhome/conf->core-site.xml,hdfs-site.xml and hive-site.xml
spark.sql('show databases').count()
spark.sql('select * from default.accountservice limit 10').show(10)
df_hive1=spark.read.table('default.accountdevice')
park.sql('insert into retail.employee values(103,"Sriram")')
DataFrame[]
>>> spark.sql('select * from retail.employee')
DataFrame[id: int, name: string]
>>> spark.sql('select * from retail.employee').show()
+---+-------+
| id| name|
+---+-------+
|102| RAMESH|
|103| SRiram|
|101|shankar|
|103| Sriram|
+---+-------+
>>> spark.read.table('retail.employee')
DataFrame[id: int, name: string]
>>> spark.read.table('retail.employee').show()
+---+-------+
| id| name|
+---+-------+
|102| RAMESH|
|103| SRiram|
|101|shankar|
|103| Sriram|
+---+-------+
df_hive1.printSchema()
root
|-- id: integer (nullable = true)
|-- name: string (nullable = true)
df_hive1.show(5)
---+-------+
| id| name|
+---+-------+
|102| RAMESH|
|103| SRiram|
|101|shankar|
|103| Sriram|
+---+-------+
df_hive2=spark.sql('select * from default.accountdevice')
df_hive2.count()
df_hive1.count()
DSL -->domain specific language
df_hive1=spark.read.table('default.accountdevice').filter('device_id=29')
df_hive1.show(5)
df3=df_hive1.withColumn('new',current_timestamp())
df3=df_hive1.withColumn('new',current_date())
+---+-------+------------+
| id| name|Current_Date|
+---+-------+------------+
|102| RAMESH| 2019-03-02|
|103| SRiram| 2019-03-02|
|101|shankar| 2019-03-02|
|103| Sriram| 2019-03-02|
+---+-------+------------+
from pyspark.sql.functions import *
df3.show(5)
df3.write.saveAsTable('default.testspark')
show tables in default;
select * from testsparkdf limit 10;
spark.sql("select * from testsparkdf2")
DataFrame[id: int, name: string, Current_Date: date]
>>> spark.sql("select * from testsparkdf2").show()
+---+-------+------------+
| id| name|Current_Date|
+---+-------+------------+
|101|shankar| 2019-03-02|
|102| RAMESH| 2019-03-02|
|103| SRiram| 2019-03-02|
|103| Sriram| 2019-03-02|
+---+-------+------------+
spark is default as parquet format
df3.write.format('csv').saveAsTable('default.testsparkdf2')
df4=spark.read.format('csv'),load("hdfs://172.16.38.131:8020/user/hive/warehouse/
testspark2/,...../csv")
hadoop@hadoop:~$ hadoop fs -ls /user/hive/warehouse/testsparkdf3
Found 5 items
-rw-r--r-- 3 hadoop supergroup 0 2019-03-02 04:14 /user/hive/warehouse/testsparkdf3/_SUCCESS
-rw-r--r-- 3 hadoop supergroup 22 2019-03-02 04:14 /user/hive/warehouse/testsparkdf3/part-00000-f2e9991d-fbaa-40a7-b2c7-730d61b2898c-c000.csv
-rw-r--r-- 3 hadoop supergroup 22 2019-03-02 04:14 /user/hive/warehouse/testsparkdf3/part-00001-f2e9991d-fbaa-40a7-b2c7-730d61b2898c-c000.csv
-rw-r--r-- 3 hadoop supergroup 23 2019-03-02 04:14 /user/hive/warehouse/testsparkdf3/part-00002-f2e9991d-fbaa-40a7-b2c7-730d61b2898c-c000.csv
-rw-r--r-- 3 hadoop supergroup 22 2019-03-02 04:14 /user/hive/warehouse/testsparkdf3/part-00003-f2e9991d-fbaa-40a7-b2c7-730d61b2898c-c000.csv
hadoop@hadoop:~$
df4.show
df4=spark.read.load("hdfs://172.16.38.131:8020/user/hive/warehouse/
testspark2/,...../csv")
df3.show()-->error is thrown
>>> df4=spark.read.format("csv").load("hdfs://127.0.0.1:9000/user/hive/warehouse/testsparkdf3/")
>>> df4.show()
+---+-------+----------+
|_c0| _c1| _c2|
+---+-------+----------+
|101|shankar|2019-03-02|
|102| RAMESH|2019-03-02|
|103| SRiram|2019-03-02|
|103| Sriram|2019-03-02|
+---+-------+----------+
HDFS-->hive(HQL)(MR Execution Engine)
spark--->(spark engine)
with hive-->spark integeration -->we are giving spark the access to HiveData as
well as Schema
spark can read and write from HDFS--(No tabular fashion)
dfs=spark.read.format('csv').option('delimeter','\t').load('file://home...tsv')
df5.show(4)
df5.write.saveAsTable('default.pysparkdf')
External table-->LOCATION
mysql
select * from TBLS;
thrift is a network protocal, which is used to establish the connection to the server.
No comments:
Post a Comment