Wednesday, February 27, 2019

PYSPARK RDBMS OPERATIONS

PYSPARK RDBMS OPERATIONS

from pyspark.sql import SparkSession

sparkdriver=SparkSession.builder.master('local').appName('demoapp').\
config('spark.jars.packages','mysql:mysql-connector-java:5.1.44').\
getOrCreate()
df_mysql=sparkdriver.read.format('jdbc').\
option('url','jdbc:mysql://localhost:3306').\
option('user','hadoop').\
option('password','hadoop').\
#option('dbtable','world.city').\
option('query','select count(*) from world.city group by ).\
load()   ===> it is running on the server side

df_mysql.show(10)

+---+----------------+-----------+-------------+----------+
| ID|            Name|CountryCode|     District|Population|
+---+----------------+-----------+-------------+----------+
|  1|           Kabul|        AFG|        Kabol|   1780000|
|  2|        Qandahar|        AFG|     Qandahar|    237500|
|  3|           Herat|        AFG|        Herat|    186800|
|  4|  Mazar-e-Sharif|        AFG|        Balkh|    127800|
|  5|       Amsterdam|        NLD|Noord-Holland|    731200|
|  6|       Rotterdam|        NLD| Zuid-Holland|    593321|
|  7|            Haag|        NLD| Zuid-Holland|    440900|
|  8|         Utrecht|        NLD|      Utrecht|    234323|
|  9|       Eindhoven|        NLD|Noord-Brabant|    201843|
| 10|         Tilburg|        NLD|Noord-Brabant|    193238|
| 11|       Groningen|        NLD|    Groningen|    172701|
| 12|           Breda|        NLD|Noord-Brabant|    160398|
| 13|       Apeldoorn|        NLD|   Gelderland|    153491|
| 14|        Nijmegen|        NLD|   Gelderland|    152463|
| 15|        Enschede|        NLD|   Overijssel|    149544|
| 16|         Haarlem|        NLD|Noord-Holland|    148772|
| 17|          Almere|        NLD|    Flevoland|    142465|
| 18|          Arnhem|        NLD|   Gelderland|    138020|
| 19|        Zaanstad|        NLD|Noord-Holland|    135621|
| 20|´s-Hertogenbosch|        NLD|Noord-Brabant|    129170|
+---+----------------+-----------+-------------+----------+

only showing top 20 rows

df_mysql=sparkdriver.read.format('jdbc').\
option('url','jdbc:mysql://localhost:3306',password='hadoop',user='hadoop',dbtable='world.city').\
load()

df_mysql.show(10)

Function Explanation:

from pyspark.sql.functions import *

sparkdriver=SparkSession.builder.master('local').appName('demoapp').\
config('spark.jars.packages','mysql:mysql-connector-java:5.1.44').\
getOrCreate()

f1=sparkdriver.sql("show functions")

print(type(f1))

print(f1.count())

#f1.show(296)

sparkdriver.sql('describe function aggregate').collect()
<class 'pyspark.sql.dataframe.DataFrame'>
296

Out[7]:
[Row(function_desc=u'Function: aggregate'),
 Row(function_desc=u'Class: org.apache.spark.sql.catalyst.expressions.ArrayAggregate'),
 Row(function_desc=u'Usage: \n      aggregate(expr, start, merge, finish) - Applies a binary operator to an initial state and all\n 
 elements in the array, and reduces this to a single state. The final state is converted\n      into the final result by applying a finish function.\n    ')]

reading from source-->daily basis--->transformation-->loading warehouse
30/11-->data1
31/11-->data2
1/12-->data3

df_mysql.show(5)

df1=df_mysql.withColumn("day",current_date())

df1=df_mysql.withColumn("day",current_timestamp())

df1=df_mysql.withColumn("day",date_add(current_timestamp(),1))

df1=df_mysql.withColumn("day",date_add(current_date(),1))

df1.select('end_time','day',yar('day'),month('day'),hour('day')).show(5)

df.write.format('json').partitionBy('day').mode('append').save("  ")

df1.show()

df1.write.partitionBy('day').mode('append').saveAsTable("calldatatable')

print('success')

from pyspark.sql.types import *

def extract(x):
  x1=x.split('T')[0].split('-')
  year=(int)(x1[0])
  month=(int)(x1[1])
  day=(int)(x1[2])
  return [year,month,day]

print(extract('2014-03-15T22:34:23'))

[2014, 3, 15]

 sparkdriver.udf.register('myfun',extract,StringType())

myfunc=udf(extract, df_mysql.select('endtime',myfunc)

No comments:

Post a Comment

Python Challenges Program

Challenges program: program 1: #Input :ABAABBCA #Output: A4B3C1 str1="ABAABBCA" str2="" d={} for x in str1: d[x]=d...