How to do groupBy Aggregation in Spark with Scala
Input
file2.csv:
--------------
s1,d1
s1,d2
s1,d2
s1,d3
s2,d1
s2,d3
s2,d1
s3,d2
s3,d1
s1,d1
s2,d1
s3,d1
s1,d1
s2,d2
s3,d3
scala> val df = spark.read.format("csv").option("inferSchema","true").load("hdfs://localhost:9000/user/file2.csv")
scala> df.show
+---+---+
|_c0|_c1|
+---+---+
| s1| d1|
| s1| d2|
| s1| d2|
| s1| d3|
| s2| d1|
| s2| d3|
| s2| d1|
| s3| d2|
| s3| d1|
| s1| d1|
| s2| d1|
| s3| d1|
| s1| d1|
| s2| d2|
| s3| d3|
+---+---+
+---+---+--------+
|_c0|_c1|count(1)|
+---+---+--------+
| s3| d2| 1|
| s2| d2| 1|
| s1| d2| 2|
| s1| d1| 3|
| s3| d1| 2|
| s2| d1| 3|
| s3| d3| 1|
| s1| d3| 1|
| s2| d3| 1|
+---+---+--------+
scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._
// Making Schema
scala> val sch = StructType(StructField("School",StringType)::StructField("Department",StringType)::Nil)
sch: org.apache.spark.sql.types.StructType = StructType(StructField(School,StringType,true), StructField(Department,StringType,true))
// applying schema
scala> val df = spark.read.format("csv").option("inferSchema","true").schema(sch).load("hdfs://localhost:9000/user/file2.csv")
df: org.apache.spark.sql.DataFrame = [School: string, Department: string]
scala> df.printSchema
root
|-- School: string (nullable = true)
|-- Department: string (nullable = true)
scala> df.show
+------+----------+
|School|Department|
+------+----------+
| s1| d1|
| s1| d2|
| s1| d2|
| s1| d3|
| s2| d1|
| s2| d3|
| s2| d1|
| s3| d2|
| s3| d1|
| s1| d1|
| s2| d1|
| s3| d1|
| s1| d1|
| s2| d2|
| s3| d3|
+------+----------+
// groupBy Aggregation operation
//added alias name for 3rd column
scala> df.groupBy("School","Department").agg(count("*") as "Count").show
+------+----------+-----+
|School|Department|Count|
+------+----------+-----+
| s3| d2| 1|
| s2| d2| 1|
| s1| d2| 2|
| s1| d1| 3|
| s3| d1| 2|
| s2| d1| 3|
| s3| d3| 1|
| s1| d3| 1|
| s2| d3| 1|
+------+----------+-----+
file2.csv:
--------------
hdfs dfs -cat /user/file2.csv
s1,d2
s1,d2
s1,d3
s2,d1
s2,d3
s2,d1
s3,d2
s3,d1
s1,d1
s2,d1
s3,d1
s1,d1
s2,d2
s3,d3
// autogenerated column headers
df: org.apache.spark.sql.DataFrame = [_c0: string, _c1: string]
scala> df.show
+---+---+
|_c0|_c1|
+---+---+
| s1| d1|
| s1| d2|
| s1| d2|
| s1| d3|
| s2| d1|
| s2| d3|
| s2| d1|
| s3| d2|
| s3| d1|
| s1| d1|
| s2| d1|
| s3| d1|
| s1| d1|
| s2| d2|
| s3| d3|
+---+---+
// groupBy aggregation goes here
scala> df.groupBy("_c0","_c1").agg(count("*")).show
|_c0|_c1|count(1)|
+---+---+--------+
| s3| d2| 1|
| s2| d2| 1|
| s1| d2| 2|
| s1| d1| 3|
| s3| d1| 2|
| s2| d1| 3|
| s3| d3| 1|
| s1| d3| 1|
| s2| d3| 1|
+---+---+--------+
// In order to make our own schema we need to import the following
import org.apache.spark.sql.types._
// Making Schema
scala> val sch = StructType(StructField("School",StringType)::StructField("Department",StringType)::Nil)
sch: org.apache.spark.sql.types.StructType = StructType(StructField(School,StringType,true), StructField(Department,StringType,true))
// applying schema
scala> val df = spark.read.format("csv").option("inferSchema","true").schema(sch).load("hdfs://localhost:9000/user/file2.csv")
df: org.apache.spark.sql.DataFrame = [School: string, Department: string]
scala> df.printSchema
root
|-- School: string (nullable = true)
|-- Department: string (nullable = true)
scala> df.show
+------+----------+
|School|Department|
+------+----------+
| s1| d1|
| s1| d2|
| s1| d2|
| s1| d3|
| s2| d1|
| s2| d3|
| s2| d1|
| s3| d2|
| s3| d1|
| s1| d1|
| s2| d1|
| s3| d1|
| s1| d1|
| s2| d2|
| s3| d3|
+------+----------+
// groupBy Aggregation operation
//added alias name for 3rd column
scala> df.groupBy("School","Department").agg(count("*") as "Count").show
+------+----------+-----+
|School|Department|Count|
+------+----------+-----+
| s3| d2| 1|
| s2| d2| 1|
| s1| d2| 2|
| s1| d1| 3|
| s3| d1| 2|
| s2| d1| 3|
| s3| d3| 1|
| s1| d3| 1|
| s2| d3| 1|
+------+----------+-----+
No comments:
Post a Comment