Tuesday, February 26, 2019

Pyspark JDBC and XML integeration

Pyspark JDBC and XML integeration :

from pyspark.sql import SparkSession

#sparkdriver=SparkSession.builder.master('local').config('spark.jars.packages','mysql:mysql-connector-java:5.1.44').config('spark.jars','c://..............").\
#config('spark.jars.repositories','http://repository.cloudera.com/artifactory/cloudera-repos').\
#appName('demoapp').getOrCreate()

#Reading from MySQL

from pyspark.sql import SparkSession

sparkdriver=SparkSession.builder.master('local').config('spark.jars.packages','mysql:mysql-connector-java:5.1.44').\
appName('demoapp').getOrCreate()
df_mysql=sparkdriver.read.format("jdbc").\
option('url','jdbc:mysql://localhost:3306').\
option('driver','com.mysql.jdbc.Driver').\
option('user','hadoop').\
option('password','hadoop').\
option('dbTable','world.city').load()


df_mysql.show(10)

| ID|            Name|CountryCode|     District|Population|
+---+----------------+-----------+-------------+----------+
|  1|           Kabul|        AFG|        Kabol|   1780000|
|  2|        Qandahar|        AFG|     Qandahar|    237500|
|  3|           Herat|        AFG|        Herat|    186800|
|  4|  Mazar-e-Sharif|        AFG|        Balkh|    127800|
|  5|       Amsterdam|        NLD|Noord-Holland|    731200|
|  6|       Rotterdam|        NLD| Zuid-Holland|    593321|
|  7|            Haag|        NLD| Zuid-Holland|    440900|
|  8|         Utrecht|        NLD|      Utrecht|    234323|
|  9|       Eindhoven|        NLD|Noord-Brabant|    201843|
| 10|         Tilburg|        NLD|Noord-Brabant|    193238|
| 11|       Groningen|        NLD|    Groningen|    172701|
| 12|           Breda|        NLD|Noord-Brabant|    160398|
| 13|       Apeldoorn|        NLD|   Gelderland|    153491|
| 14|        Nijmegen|        NLD|   Gelderland|    152463|
| 15|        Enschede|        NLD|   Overijssel|    149544|
| 16|         Haarlem|        NLD|Noord-Holland|    148772|
| 17|          Almere|        NLD|    Flevoland|    142465|
| 18|          Arnhem|        NLD|   Gelderland|    138020|
| 19|        Zaanstad|        NLD|Noord-Holland|    135621|
| 20|´s-Hertogenbosch|        NLD|Noord-Brabant|    129170|

+---+----------------+-----------+-------------+----------+

df_mysql.printSchema()
root
 root
 |-- ID: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- CountryCode: string (nullable = true)
 |-- District: string (nullable = true)
 |-- Population: integer (nullable = true)

 #Filter the values
df_mysql.filter('CountryCode="NLD"' and 'District="Gelderland"').show(5)

#Read the json file
input file:
{"name":"Michael", "salary":3000}
{"name":"Andy", "salary":4500}
{"name":"Justin", "salary":3500}
{"name":"Berta", "salary":4000}


df_local=sparkdriver.read.format('json').load("file:///home/hadoop/employee.json")

df_local.show(5)
+-------+------+
|   name|salary|
+-------+------+
|Michael|  3000|
|   Andy|  4500|
| Justin|  3500|
|  Berta|  4000|
df_local2=df_local.select('name','salary')

#Writing into SQL

df_mysqlw=df_local2.write.format("jdbc").\
option('url','jdbc:mysql://localhost:3306').\
option('driver','com.mysql.jdbc.Driver').\
option('user','hadoop').\
option('password','hadoop').\
option('dbTable','employee').\
mode('append').\
save()

#check in MYSQL
mysql> select * from employee;
+---------+--------+
| name    | salary |
+---------+--------+
| Michael |   3000 |
| Andy    |   4500 |
| Justin  |   3500 |
| Berta   |   4000 |
+---------+--------+
4 rows in set (0.00 sec)

#Reading the XML files

<Records>
<Rec>
 <Name>John</Name>
 <Age>10</Age>
 <Gender>M</Gender>
</Rec>
<Rec>
 <Name>Jenny</Name>
 <Age>12</Age>
 <Gender>F</Gender>
</Rec>
<Rec>
 <Name>Janardhan</Name>
 <Age>14</Age>
 <Gender>M</Gender>
</Rec>
</Records>


pyspark --packages  com.databricks:spark-xml_2.11:0.4.1

from pyspark.sql import SparkSession
sparkdriver=SparkSession.builder.master('local').config('spark.jars.packages','com.databricks:spark-xml_2.11:0.4.1').\
config('spark.jars','file:///home/hadoop/Downloads/spark-xml-0.1.1-s_2.11.jar').\
appName('demoapp1').getOrCreate()
df_xml=sparkdriver.read.format('xml').option('rowTag','Rec').load('file:///home/hadoop/records.xml')

df_xml.show()
+---+------+---------+
|Age|Gender|     Name|
+---+------+---------+
| 10|     M|     John|
| 12|     F|    Jenny|
| 14|     M|Janardhan|
+---+------+---------+

df_xml.select('Name','age').show(5)
+---------+---+
|     Name|age|
+---------+---+
|     John| 10|
|    Jenny| 12|
|Janardhan| 14|
+---------+---+

#writing into ORC file

df_xml.write.format('orc').partitionBy('Gender').save("file:///home/hadoop/xml_orc/")

Output:
------------

total 8
drwxr-xr-x 2 hadoop hadoop 4096 Feb 27 02:03 'Gender=F'
drwxr-xr-x 2 hadoop hadoop 4096 Feb 27 02:03 'Gender=M'
-rw-r--r-- 1 hadoop hadoop    0 Feb 27 02:03  _SUCCESS
hadoop@hadoop:~/xml_orc$ cd Gender=F
hadoop@hadoop:~/xml_orc/Gender=F$ ls
part-00000-79c6d983-a31d-4fc4-9117-95250f3d3cbb.c000.snappy.orc

No comments:

Post a Comment

Python Challenges Program

Challenges program: program 1: #Input :ABAABBCA #Output: A4B3C1 str1="ABAABBCA" str2="" d={} for x in str1: d[x]=d...