Pyspark JDBC and XML integeration :
from pyspark.sql import SparkSession
#sparkdriver=SparkSession.builder.master('local').config('spark.jars.packages','mysql:mysql-connector-java:5.1.44').config('spark.jars','c://..............").\
#config('spark.jars.repositories','http://repository.cloudera.com/artifactory/cloudera-repos').\
#appName('demoapp').getOrCreate()
#Reading from MySQL
from pyspark.sql import SparkSession
sparkdriver=SparkSession.builder.master('local').config('spark.jars.packages','mysql:mysql-connector-java:5.1.44').\
appName('demoapp').getOrCreate()
df_mysql=sparkdriver.read.format("jdbc").\
option('url','jdbc:mysql://localhost:3306').\
option('driver','com.mysql.jdbc.Driver').\
option('user','hadoop').\
option('password','hadoop').\
option('dbTable','world.city').load()
df_mysql.show(10)
| ID| Name|CountryCode| District|Population|
+---+----------------+-----------+-------------+----------+
| 1| Kabul| AFG| Kabol| 1780000|
| 2| Qandahar| AFG| Qandahar| 237500|
| 3| Herat| AFG| Herat| 186800|
| 4| Mazar-e-Sharif| AFG| Balkh| 127800|
| 5| Amsterdam| NLD|Noord-Holland| 731200|
| 6| Rotterdam| NLD| Zuid-Holland| 593321|
| 7| Haag| NLD| Zuid-Holland| 440900|
| 8| Utrecht| NLD| Utrecht| 234323|
| 9| Eindhoven| NLD|Noord-Brabant| 201843|
| 10| Tilburg| NLD|Noord-Brabant| 193238|
| 11| Groningen| NLD| Groningen| 172701|
| 12| Breda| NLD|Noord-Brabant| 160398|
| 13| Apeldoorn| NLD| Gelderland| 153491|
| 14| Nijmegen| NLD| Gelderland| 152463|
| 15| Enschede| NLD| Overijssel| 149544|
| 16| Haarlem| NLD|Noord-Holland| 148772|
| 17| Almere| NLD| Flevoland| 142465|
| 18| Arnhem| NLD| Gelderland| 138020|
| 19| Zaanstad| NLD|Noord-Holland| 135621|
| 20|´s-Hertogenbosch| NLD|Noord-Brabant| 129170|
+---+----------------+-----------+-------------+----------+
df_mysql.printSchema()
root
root
|-- ID: integer (nullable = true)
|-- Name: string (nullable = true)
|-- CountryCode: string (nullable = true)
|-- District: string (nullable = true)
|-- Population: integer (nullable = true)
#Filter the values
df_mysql.filter('CountryCode="NLD"' and 'District="Gelderland"').show(5)
#Read the json file
input file:
{"name":"Michael", "salary":3000}
{"name":"Andy", "salary":4500}
{"name":"Justin", "salary":3500}
{"name":"Berta", "salary":4000}
df_local=sparkdriver.read.format('json').load("file:///home/hadoop/employee.json")
df_local.show(5)
+-------+------+
| name|salary|
+-------+------+
|Michael| 3000|
| Andy| 4500|
| Justin| 3500|
| Berta| 4000|
df_local2=df_local.select('name','salary')
#Writing into SQL
df_mysqlw=df_local2.write.format("jdbc").\
option('url','jdbc:mysql://localhost:3306').\
option('driver','com.mysql.jdbc.Driver').\
option('user','hadoop').\
option('password','hadoop').\
option('dbTable','employee').\
mode('append').\
save()
#check in MYSQL
mysql> select * from employee;
+---------+--------+
| name | salary |
+---------+--------+
| Michael | 3000 |
| Andy | 4500 |
| Justin | 3500 |
| Berta | 4000 |
+---------+--------+
4 rows in set (0.00 sec)
#Reading the XML files
<Records>
<Rec>
<Name>John</Name>
<Age>10</Age>
<Gender>M</Gender>
</Rec>
<Rec>
<Name>Jenny</Name>
<Age>12</Age>
<Gender>F</Gender>
</Rec>
<Rec>
<Name>Janardhan</Name>
<Age>14</Age>
<Gender>M</Gender>
</Rec>
</Records>
pyspark --packages com.databricks:spark-xml_2.11:0.4.1
from pyspark.sql import SparkSession
sparkdriver=SparkSession.builder.master('local').config('spark.jars.packages','com.databricks:spark-xml_2.11:0.4.1').\
config('spark.jars','file:///home/hadoop/Downloads/spark-xml-0.1.1-s_2.11.jar').\
appName('demoapp1').getOrCreate()
df_xml=sparkdriver.read.format('xml').option('rowTag','Rec').load('file:///home/hadoop/records.xml')
df_xml.show()
+---+------+---------+
|Age|Gender| Name|
+---+------+---------+
| 10| M| John|
| 12| F| Jenny|
| 14| M|Janardhan|
+---+------+---------+
df_xml.select('Name','age').show(5)
+---------+---+
| Name|age|
+---------+---+
| John| 10|
| Jenny| 12|
|Janardhan| 14|
+---------+---+
#writing into ORC file
df_xml.write.format('orc').partitionBy('Gender').save("file:///home/hadoop/xml_orc/")
Output:
------------
total 8
drwxr-xr-x 2 hadoop hadoop 4096 Feb 27 02:03 'Gender=F'
drwxr-xr-x 2 hadoop hadoop 4096 Feb 27 02:03 'Gender=M'
-rw-r--r-- 1 hadoop hadoop 0 Feb 27 02:03 _SUCCESS
hadoop@hadoop:~/xml_orc$ cd Gender=F
hadoop@hadoop:~/xml_orc/Gender=F$ ls
part-00000-79c6d983-a31d-4fc4-9117-95250f3d3cbb.c000.snappy.orc
from pyspark.sql import SparkSession
#sparkdriver=SparkSession.builder.master('local').config('spark.jars.packages','mysql:mysql-connector-java:5.1.44').config('spark.jars','c://..............").\
#config('spark.jars.repositories','http://repository.cloudera.com/artifactory/cloudera-repos').\
#appName('demoapp').getOrCreate()
#Reading from MySQL
from pyspark.sql import SparkSession
sparkdriver=SparkSession.builder.master('local').config('spark.jars.packages','mysql:mysql-connector-java:5.1.44').\
appName('demoapp').getOrCreate()
df_mysql=sparkdriver.read.format("jdbc").\
option('url','jdbc:mysql://localhost:3306').\
option('driver','com.mysql.jdbc.Driver').\
option('user','hadoop').\
option('password','hadoop').\
option('dbTable','world.city').load()
df_mysql.show(10)
| ID| Name|CountryCode| District|Population|
+---+----------------+-----------+-------------+----------+
| 1| Kabul| AFG| Kabol| 1780000|
| 2| Qandahar| AFG| Qandahar| 237500|
| 3| Herat| AFG| Herat| 186800|
| 4| Mazar-e-Sharif| AFG| Balkh| 127800|
| 5| Amsterdam| NLD|Noord-Holland| 731200|
| 6| Rotterdam| NLD| Zuid-Holland| 593321|
| 7| Haag| NLD| Zuid-Holland| 440900|
| 8| Utrecht| NLD| Utrecht| 234323|
| 9| Eindhoven| NLD|Noord-Brabant| 201843|
| 10| Tilburg| NLD|Noord-Brabant| 193238|
| 11| Groningen| NLD| Groningen| 172701|
| 12| Breda| NLD|Noord-Brabant| 160398|
| 13| Apeldoorn| NLD| Gelderland| 153491|
| 14| Nijmegen| NLD| Gelderland| 152463|
| 15| Enschede| NLD| Overijssel| 149544|
| 16| Haarlem| NLD|Noord-Holland| 148772|
| 17| Almere| NLD| Flevoland| 142465|
| 18| Arnhem| NLD| Gelderland| 138020|
| 19| Zaanstad| NLD|Noord-Holland| 135621|
| 20|´s-Hertogenbosch| NLD|Noord-Brabant| 129170|
+---+----------------+-----------+-------------+----------+
df_mysql.printSchema()
root
root
|-- ID: integer (nullable = true)
|-- Name: string (nullable = true)
|-- CountryCode: string (nullable = true)
|-- District: string (nullable = true)
|-- Population: integer (nullable = true)
#Filter the values
df_mysql.filter('CountryCode="NLD"' and 'District="Gelderland"').show(5)
#Read the json file
input file:
{"name":"Michael", "salary":3000}
{"name":"Andy", "salary":4500}
{"name":"Justin", "salary":3500}
{"name":"Berta", "salary":4000}
df_local=sparkdriver.read.format('json').load("file:///home/hadoop/employee.json")
df_local.show(5)
+-------+------+
| name|salary|
+-------+------+
|Michael| 3000|
| Andy| 4500|
| Justin| 3500|
| Berta| 4000|
df_local2=df_local.select('name','salary')
#Writing into SQL
df_mysqlw=df_local2.write.format("jdbc").\
option('url','jdbc:mysql://localhost:3306').\
option('driver','com.mysql.jdbc.Driver').\
option('user','hadoop').\
option('password','hadoop').\
option('dbTable','employee').\
mode('append').\
save()
#check in MYSQL
mysql> select * from employee;
+---------+--------+
| name | salary |
+---------+--------+
| Michael | 3000 |
| Andy | 4500 |
| Justin | 3500 |
| Berta | 4000 |
+---------+--------+
4 rows in set (0.00 sec)
#Reading the XML files
<Records>
<Rec>
<Name>John</Name>
<Age>10</Age>
<Gender>M</Gender>
</Rec>
<Rec>
<Name>Jenny</Name>
<Age>12</Age>
<Gender>F</Gender>
</Rec>
<Rec>
<Name>Janardhan</Name>
<Age>14</Age>
<Gender>M</Gender>
</Rec>
</Records>
pyspark --packages com.databricks:spark-xml_2.11:0.4.1
from pyspark.sql import SparkSession
sparkdriver=SparkSession.builder.master('local').config('spark.jars.packages','com.databricks:spark-xml_2.11:0.4.1').\
config('spark.jars','file:///home/hadoop/Downloads/spark-xml-0.1.1-s_2.11.jar').\
appName('demoapp1').getOrCreate()
df_xml=sparkdriver.read.format('xml').option('rowTag','Rec').load('file:///home/hadoop/records.xml')
df_xml.show()
+---+------+---------+
|Age|Gender| Name|
+---+------+---------+
| 10| M| John|
| 12| F| Jenny|
| 14| M|Janardhan|
+---+------+---------+
df_xml.select('Name','age').show(5)
+---------+---+
| Name|age|
+---------+---+
| John| 10|
| Jenny| 12|
|Janardhan| 14|
+---------+---+
#writing into ORC file
df_xml.write.format('orc').partitionBy('Gender').save("file:///home/hadoop/xml_orc/")
Output:
------------
total 8
drwxr-xr-x 2 hadoop hadoop 4096 Feb 27 02:03 'Gender=F'
drwxr-xr-x 2 hadoop hadoop 4096 Feb 27 02:03 'Gender=M'
-rw-r--r-- 1 hadoop hadoop 0 Feb 27 02:03 _SUCCESS
hadoop@hadoop:~/xml_orc$ cd Gender=F
hadoop@hadoop:~/xml_orc/Gender=F$ ls
part-00000-79c6d983-a31d-4fc4-9117-95250f3d3cbb.c000.snappy.orc
No comments:
Post a Comment