Tuesday, February 19, 2019

How to do groupBy Aggregation in Spark with Scala

How to do groupBy Aggregation in Spark with Scala



Input
file2.csv:
--------------

hdfs dfs -cat /user/file2.csv

s1,d1
s1,d2
s1,d2
s1,d3
s2,d1
s2,d3
s2,d1
s3,d2
s3,d1
s1,d1
s2,d1
s3,d1
s1,d1
s2,d2
s3,d3


// autogenerated column headers

scala> val df = spark.read.format("csv").option("inferSchema","true").load("hdfs://localhost:9000/user/file2.csv")

df: org.apache.spark.sql.DataFrame = [_c0: string, _c1: string]

scala> df.show
+---+---+
|_c0|_c1|
+---+---+
| s1| d1|
| s1| d2|
| s1| d2|
| s1| d3|
| s2| d1|
| s2| d3|
| s2| d1|
| s3| d2|
| s3| d1|
| s1| d1|
| s2| d1|
| s3| d1|
| s1| d1|
| s2| d2|
| s3| d3|
+---+---+

// groupBy aggregation goes here

scala> df.groupBy("_c0","_c1").agg(count("*")).show

+---+---+--------+                                                           
|_c0|_c1|count(1)|
+---+---+--------+
| s3| d2|       1|
| s2| d2|       1|
| s1| d2|       2|
| s1| d1|       3|
| s3| d1|       2|
| s2| d1|       3|
| s3| d3|       1|
| s1| d3|       1|
| s2| d3|       1|
+---+---+--------+

// In order to make our own schema we need to import the following

scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._

// Making Schema
scala> val sch = StructType(StructField("School",StringType)::StructField("Department",StringType)::Nil)
sch: org.apache.spark.sql.types.StructType = StructType(StructField(School,StringType,true), StructField(Department,StringType,true))

// applying schema

scala> val df = spark.read.format("csv").option("inferSchema","true").schema(sch).load("hdfs://localhost:9000/user/file2.csv")
df: org.apache.spark.sql.DataFrame = [School: string, Department: string]

scala> df.printSchema
root
 |-- School: string (nullable = true)
 |-- Department: string (nullable = true)

scala> df.show
+------+----------+
|School|Department|
+------+----------+
|    s1|        d1|
|    s1|        d2|
|    s1|        d2|
|    s1|        d3|
|    s2|        d1|
|    s2|        d3|
|    s2|        d1|
|    s3|        d2|
|    s3|        d1|
|    s1|        d1|
|    s2|        d1|
|    s3|        d1|
|    s1|        d1|
|    s2|        d2|
|    s3|        d3|
+------+----------+

// groupBy Aggregation operation

//added alias name for 3rd column
scala> df.groupBy("School","Department").agg(count("*") as "Count").show
+------+----------+-----+
|School|Department|Count|
+------+----------+-----+
|    s3|        d2|    1|
|    s2|        d2|    1|
|    s1|        d2|    2|
|    s1|        d1|    3|
|    s3|        d1|    2|
|    s2|        d1|    3|
|    s3|        d3|    1|
|    s1|        d3|    1|
|    s2|        d3|    1|
+------+----------+-----+

No comments:

Post a Comment

Python Challenges Program

Challenges program: program 1: #Input :ABAABBCA #Output: A4B3C1 str1="ABAABBCA" str2="" d={} for x in str1: d[x]=d...