PYSPARK RDBMS OPERATIONS
from pyspark.sql import SparkSession
sparkdriver=SparkSession.builder.master('local').appName('demoapp').\
config('spark.jars.packages','mysql:mysql-connector-java:5.1.44').\
getOrCreate()
df_mysql=sparkdriver.read.format('jdbc').\
option('url','jdbc:mysql://localhost:3306').\
option('user','hadoop').\
option('password','hadoop').\
#option('dbtable','world.city').\
option('query','select count(*) from world.city group by ).\
load() ===> it is running on the server side
df_mysql.show(10)
+---+----------------+-----------+-------------+----------+
| ID| Name|CountryCode| District|Population|
+---+----------------+-----------+-------------+----------+
| 1| Kabul| AFG| Kabol| 1780000|
| 2| Qandahar| AFG| Qandahar| 237500|
| 3| Herat| AFG| Herat| 186800|
| 4| Mazar-e-Sharif| AFG| Balkh| 127800|
| 5| Amsterdam| NLD|Noord-Holland| 731200|
| 6| Rotterdam| NLD| Zuid-Holland| 593321|
| 7| Haag| NLD| Zuid-Holland| 440900|
| 8| Utrecht| NLD| Utrecht| 234323|
| 9| Eindhoven| NLD|Noord-Brabant| 201843|
| 10| Tilburg| NLD|Noord-Brabant| 193238|
| 11| Groningen| NLD| Groningen| 172701|
| 12| Breda| NLD|Noord-Brabant| 160398|
| 13| Apeldoorn| NLD| Gelderland| 153491|
| 14| Nijmegen| NLD| Gelderland| 152463|
| 15| Enschede| NLD| Overijssel| 149544|
| 16| Haarlem| NLD|Noord-Holland| 148772|
| 17| Almere| NLD| Flevoland| 142465|
| 18| Arnhem| NLD| Gelderland| 138020|
| 19| Zaanstad| NLD|Noord-Holland| 135621|
| 20|´s-Hertogenbosch| NLD|Noord-Brabant| 129170|
+---+----------------+-----------+-------------+----------+
only showing top 20 rows
df_mysql=sparkdriver.read.format('jdbc').\
option('url','jdbc:mysql://localhost:3306',password='hadoop',user='hadoop',dbtable='world.city').\
load()
df_mysql.show(10)
Function Explanation:
from pyspark.sql.functions import *
sparkdriver=SparkSession.builder.master('local').appName('demoapp').\
config('spark.jars.packages','mysql:mysql-connector-java:5.1.44').\
getOrCreate()
f1=sparkdriver.sql("show functions")
print(type(f1))
print(f1.count())
#f1.show(296)
sparkdriver.sql('describe function aggregate').collect()
<class 'pyspark.sql.dataframe.DataFrame'>
296
Out[7]:
[Row(function_desc=u'Function: aggregate'),
Row(function_desc=u'Class: org.apache.spark.sql.catalyst.expressions.ArrayAggregate'),
Row(function_desc=u'Usage: \n aggregate(expr, start, merge, finish) - Applies a binary operator to an initial state and all\n
elements in the array, and reduces this to a single state. The final state is converted\n into the final result by applying a finish function.\n ')]
reading from source-->daily basis--->transformation-->loading warehouse
30/11-->data1
31/11-->data2
1/12-->data3
df_mysql.show(5)
df1=df_mysql.withColumn("day",current_date())
df1=df_mysql.withColumn("day",current_timestamp())
df1=df_mysql.withColumn("day",date_add(current_timestamp(),1))
df1=df_mysql.withColumn("day",date_add(current_date(),1))
df1.select('end_time','day',yar('day'),month('day'),hour('day')).show(5)
df.write.format('json').partitionBy('day').mode('append').save(" ")
df1.show()
df1.write.partitionBy('day').mode('append').saveAsTable("calldatatable')
print('success')
from pyspark.sql.types import *
def extract(x):
x1=x.split('T')[0].split('-')
year=(int)(x1[0])
month=(int)(x1[1])
day=(int)(x1[2])
return [year,month,day]
print(extract('2014-03-15T22:34:23'))
[2014, 3, 15]
sparkdriver.udf.register('myfun',extract,StringType())
myfunc=udf(extract, df_mysql.select('endtime',myfunc)
from pyspark.sql import SparkSession
sparkdriver=SparkSession.builder.master('local').appName('demoapp').\
config('spark.jars.packages','mysql:mysql-connector-java:5.1.44').\
getOrCreate()
df_mysql=sparkdriver.read.format('jdbc').\
option('url','jdbc:mysql://localhost:3306').\
option('user','hadoop').\
option('password','hadoop').\
#option('dbtable','world.city').\
option('query','select count(*) from world.city group by ).\
load() ===> it is running on the server side
df_mysql.show(10)
+---+----------------+-----------+-------------+----------+
| ID| Name|CountryCode| District|Population|
+---+----------------+-----------+-------------+----------+
| 1| Kabul| AFG| Kabol| 1780000|
| 2| Qandahar| AFG| Qandahar| 237500|
| 3| Herat| AFG| Herat| 186800|
| 4| Mazar-e-Sharif| AFG| Balkh| 127800|
| 5| Amsterdam| NLD|Noord-Holland| 731200|
| 6| Rotterdam| NLD| Zuid-Holland| 593321|
| 7| Haag| NLD| Zuid-Holland| 440900|
| 8| Utrecht| NLD| Utrecht| 234323|
| 9| Eindhoven| NLD|Noord-Brabant| 201843|
| 10| Tilburg| NLD|Noord-Brabant| 193238|
| 11| Groningen| NLD| Groningen| 172701|
| 12| Breda| NLD|Noord-Brabant| 160398|
| 13| Apeldoorn| NLD| Gelderland| 153491|
| 14| Nijmegen| NLD| Gelderland| 152463|
| 15| Enschede| NLD| Overijssel| 149544|
| 16| Haarlem| NLD|Noord-Holland| 148772|
| 17| Almere| NLD| Flevoland| 142465|
| 18| Arnhem| NLD| Gelderland| 138020|
| 19| Zaanstad| NLD|Noord-Holland| 135621|
| 20|´s-Hertogenbosch| NLD|Noord-Brabant| 129170|
+---+----------------+-----------+-------------+----------+
only showing top 20 rows
df_mysql=sparkdriver.read.format('jdbc').\
option('url','jdbc:mysql://localhost:3306',password='hadoop',user='hadoop',dbtable='world.city').\
load()
df_mysql.show(10)
Function Explanation:
from pyspark.sql.functions import *
sparkdriver=SparkSession.builder.master('local').appName('demoapp').\
config('spark.jars.packages','mysql:mysql-connector-java:5.1.44').\
getOrCreate()
f1=sparkdriver.sql("show functions")
print(type(f1))
print(f1.count())
#f1.show(296)
sparkdriver.sql('describe function aggregate').collect()
<class 'pyspark.sql.dataframe.DataFrame'>
296
Out[7]:
[Row(function_desc=u'Function: aggregate'),
Row(function_desc=u'Class: org.apache.spark.sql.catalyst.expressions.ArrayAggregate'),
Row(function_desc=u'Usage: \n aggregate(expr, start, merge, finish) - Applies a binary operator to an initial state and all\n
elements in the array, and reduces this to a single state. The final state is converted\n into the final result by applying a finish function.\n ')]
reading from source-->daily basis--->transformation-->loading warehouse
30/11-->data1
31/11-->data2
1/12-->data3
df_mysql.show(5)
df1=df_mysql.withColumn("day",current_date())
df1=df_mysql.withColumn("day",current_timestamp())
df1=df_mysql.withColumn("day",date_add(current_timestamp(),1))
df1=df_mysql.withColumn("day",date_add(current_date(),1))
df1.select('end_time','day',yar('day'),month('day'),hour('day')).show(5)
df.write.format('json').partitionBy('day').mode('append').save(" ")
df1.show()
df1.write.partitionBy('day').mode('append').saveAsTable("calldatatable')
print('success')
from pyspark.sql.types import *
def extract(x):
x1=x.split('T')[0].split('-')
year=(int)(x1[0])
month=(int)(x1[1])
day=(int)(x1[2])
return [year,month,day]
print(extract('2014-03-15T22:34:23'))
[2014, 3, 15]
sparkdriver.udf.register('myfun',extract,StringType())
myfunc=udf(extract, df_mysql.select('endtime',myfunc)
No comments:
Post a Comment