Multiline json parser in Spark with Scala

hdfs dfs -copyFromLocal myInput.json /user/

hdfs dfs -cat /user/myInput.json

"Year": "2013",
"First Name": "JANE",
"County": "A",
"Sex": "F",
"Count": "27"
}, {
"Year": "2013",
"First Name": "JADE",
"County": "B",
"Sex": "M",
"Count": "26"
}, {
"Year": "2013",
"First Name": "JAMES",
"County": "C",
"Sex": "M",
"Count": "21"

scala> import org.apache.spark.sql.SQLContext

import org.apache.spark.sql.SQLContext

scala> val sqlContext = new SQLContext(sc)

warning: there was one deprecation warning; re-run with -deprecation for details
sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@3318d5ab

scala> val rdd1 = sc.wholeTextFiles("hdfs://localhost:9000/user/myInput.json").map(x => x._2)

rdd1: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[168] at map at <console>:28

scala> val namesJson =

warning: there was one deprecation warning; re-run with -deprecation for details
namesJson: org.apache.spark.sql.DataFrame = [Count: string, County: string ... 3 more fields]

scala> namesJson.printSchema
 |-- Count: string (nullable = true)
 |-- County: string (nullable = true)
 |-- First Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Year: string (nullable = true)


|Count|County|First Name|Sex|Year|
|   27|     A|      JANE|  F|2013|
|   26|     B|      JADE|  M|2013|
|   21|     C|     JAMES|  M|2013|

      "id": "59761c23b30d971669fb42ff",
      "isActive": true,
      "age": 36,
      "name": "Dunlap Hubbard",
      "gender": "male",
      "company": "CEDWARD",
      "email": "",
      "phone": "+1 (890) 543-2508",
      "address": "169 Rutledge Street, Konterra, Northern Mariana Islands, 8551"
      "id": "59761c233d8d0f92a6b0570d",
      "isActive": true,
      "age": 24,
      "name": "Kirsten Sellers",
      "gender": "female",
      "company": "EMERGENT",
      "email": "",
      "phone": "+1 (831) 564-2190",
      "address": "886 Gallatin Place, Fannett, Arkansas, 4656"
      "id": "59761c23fcb6254b1a06dad5",
      "isActive": true,
      "age": 30,
      "name": "Acosta Robbins",
      "gender": "male",
      "company": "ORGANICA",
      "email": "",
      "phone": "+1 (882) 441-3367",
      "address": "697 Linden Boulevard, Sattley, Idaho, 1035"

// it will result an Array with filePath as 1st field, json content as 2nd field
val rdd1 = sc.wholeTextFiles("hdfs://localhost:9000/user/server.json")

//Here we extract the 2nd field (json content) only. We ignore filePath (1st field)
scala> val rdd1 = sc.wholeTextFiles("hdfs://localhost:9000/user/server.json").map(x => x._2)

scala> val df =

warning: there was one deprecation warning; re-run with -deprecation for details
df: org.apache.spark.sql.DataFrame = [address: string, age: bigint ... 7 more fields]


|             address|age| company|               email|gender|                  id|isActive|           name|            phone|
|169 Rutledge Stre...| 36| CEDWARD|dunlaphubbard@ced...|  male|59761c23b30d97166...|    true| Dunlap Hubbard|+1 (890) 543-2508|
|886 Gallatin Plac...| 24|EMERGENT|kirstensellers@em...|female|59761c233d8d0f92a...|    true|Kirsten Sellers|+1 (831) 564-2190|
scala> df.filter($"age">30).show

|             address|age|company|               email|gender|                  id|isActive|          name|            phone|
|169 Rutledge Stre...| 36|CEDWARD|dunlaphubbard@ced...|  male|59761c23b30d97166...|    true|Dunlap Hubbard|+1 (890) 543-2508|

scala> df.filter($"gender"==="male").show

|             address|age| company|               email|gender|                  id|isActive|          name|            phone|
|169 Rutledge Stre...| 36| CEDWARD|dunlaphubbard@ced...|  male|59761c23b30d97166...|    true|Dunlap Hubbard|+1 (890) 543-2508|
|697 Linden Boulev...| 30|ORGANICA|acostarobbins@org...|  male|59761c23fcb6254b1...|    true|Acosta Robbins|+1 (882) 441-3367|

scala> df.where($"age">30 and $"gender" ==="male").show

|             address|age|company|               email|gender|                  id|isActive|          name|            phone|
|169 Rutledge Stre...| 36|CEDWARD|dunlaphubbard@ced...|  male|59761c23b30d97166...|    true|Dunlap Hubbard|+1 (890) 543-2508|


Mon Jan 20 00:00:00 -0800 2014, {"cl":"js","ua":"Mozilla/5.0 (Windows NT 6.1; rv:26.0) Gecko/20100101 Firefox/26.0","ip":"","cc":"US","rg":"WA","ct":"Everett","pc":"98208","mc":819,"bf":"cd21df8fdaa6eee6b8af906ab1345fe6ce797ad1","vst":"56941a03-b0bd-4f93-912c-f90ba0dc4159","lt":"Sun Jan 19 23:59:39 -0800 2014"}, {"v":"1.1","e":"block","et":"keyword","t":"10957643","u":"","pu":"","bk":"nude","pv":"98580188-4abd-4a1e-ae98-6e21be2d2c5d"}
Mon Jan 20 00:00:00 -0800 2014, {"cl":"js","ua":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36","ip":"","cc":"US","rg":"IL","ct":"Chicago","pc":"60616","mc":602,"bf":"ec8498f87549a798e738fce5b312a791f1a4d683","vst":"761df6d3-8747-4939-b5f9-f2ecdb95b419","lt":"Mon Jan 20 02:00:00 -0600 2014"}, {"v":"1.1","e":"block","et":"keyword","t":"e0b5822c","u":"","pu":"","bk":"sex photos","pv":"8a529597-a4f0-45e8-9e7e-ef5ccb1aa169"}
Mon Jan 20 00:00:00 -0800 2014, {"cl":"js","ua":"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/31.0.1650.63 Safari/537.36","ip":"","cc":"US","rg":"CA","ct":"Los Angeles","pc":"90025","mc":803,"bf":"b519456e03072cc4c58fa60908bca87e8a14fda3","vst":"2a1b62ff-78f5-4731-bc0d-4be3c1e1c69d","lt":"Sun Jan 19 23:59:58 -0800 2014"}, {"v":"1.1","e":"view","t":"0a196892","ab":20722,"u":"","seq":3,"tr":0.7,"af":true,"pv":"6d85dcf3-6f27-43a9-aa21-91fa46ee8908","pu":"","rpm":1.76,"pc":0.264,"nc":0.1,"cid":389}
Mon Jan 20 00:00:00 -0800 2014, {"cl":"js","ua":"Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 5.1; Trident/4.0; GTB7.5; .NET CLR 2.0.50727; .NET CLR 3.0.4506.2152; .NET CLR 3.5.30729; McAfee)","ip":"","cc":"US","rg":"FL","ct":"Lake City","pc":"32024","mc":561,"bf":"c7b60dc38c3512c45dbb57af3cac175d4099d083","vst":"2eaa64ca-1700-499b-bd0a-4216a37b50af","lt":"Mon Jan 20 03:00:13 -0500 2014"}, {"v":"1.1","e":"view","t":"05c113f6","ab":18667,"u":"","seq":4,"tr":0.1,"af":true,"pv":"407dfa23-d2bc-4469-88cc-4c0a1b4c45d2","pu":"","rpm":0,"pc":0.2,"nc":0.15,"cid":44}
Mon Jan 20 00:00:00 -0800 2014, {"cl":"js","ua":"Mozilla/5.0 (Windows NT 6.2; WOW64; rv:26.0) Gecko/20100101 Firefox/26.0","ip":"","cc":"CO","rg":"34","ct":"Bogotá","mc":0,"bf":"454f55805932783a937972bcc648d3b8ef7d67a6","vst":"bbd8af36-4e42-4ee6-934d-c8610266e934","lt":"Mon Jan 20 02:59:56 -0500 2014"}, {"v":"1.1","e":"block","et":"user","t":"5cc3fdcf","u":"","pu":"","uid":154,"pv":"1bf1c548-1c4e-40e1-9c0c-9b19fb71d484"}

scala> val rdd1 = sc.textFile("hdfs://localhost:9000/user/ads/ad_events.txt")

rdd1: org.apache.spark.rdd.RDD[String] = hdfs://localhost:9000/user/ads/ad_events.txt
MapPartitionsRDD[45] at textFile at <console>:24

scala> rdd1.count

res22: Long = 29238

scala> rdd1.take(2).foreach(println)

Mon Jan 20 00:00:00 -0800 2014, {"cl":"js","ua":"Mozilla/5.0 (Windows NT 6.1; rv:26.0) Gecko/20100101 Firefox/26.0","ip":"","cc":"US","rg":"WA","ct":"Everett","pc":"98208","mc":819,"bf":"cd21df8fdaa6eee6b8af906ab1345fe6ce797ad1","vst":"56941a03-b0bd-4f93-912c-f90ba0dc4159","lt":"Sun Jan 19 23:59:39 -0800 2014"}, {"v":"1.1","e":"block","et":"keyword","t":"10957643","u":"","pu":"

Mon Jan 20 00:00:00 -0800 2014, {"cl":"js","ua":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36","ip":"","cc":"US","rg":"IL","ct":"Chicago","pc":"60616","mc":602,"bf":"ec8498f87549a798e738fce5b312a791f1a4d683","vst":"761df6d3-8747-4939-b5f9-f2ecdb95b419","lt":"Mon Jan 20 02:00:00 -0600 2014"}, {"v":"1.1","e":"block","et":"keyword","t":"e0b5822c","u":"","pu":"","bk":"sex photos","pv":"8a529597-a4f0-45e8-9e7e-ef5ccb1aa169"}

scala> val pattern = "[0-9]{4}, ".r

pattern: scala.util.matching.Regex = [0-9]{4}
scala> val rdd2 = => x.split(pattern.findFirstIn(x).get))

rdd2: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[46] at map at <console>:27

// rdd2 has 2 fields

scala> rdd2.take(2)

res25: Array[Array[String]] = Array(Array("Mon Jan 20 00:00:00 -0800 ", {"cl":"js","ua":"Mozilla/5.0 (Windows NT 6.1; rv:26.0) Gecko/20100101 Firefox/26.0","ip":"","cc":"US","rg":"WA","ct":"Everett","pc":"98208","mc":819,"bf":"cd21df8fdaa6eee6b8af906ab1345fe6ce797ad1","vst":"56941a03-b0bd-4f93-912c-f90ba0dc4159","lt":"Sun Jan 19 23:59:39 -0800 2014"}, {"v":"1.1","e":"block","et":"keyword","t":"10957643","u":"","pu":"","bk":"nude","pv":"98580188-4abd-4a1e-ae98-6e2...



// we extract the 2nd field

scala> val rdd3 = => (x(1)))

rdd3: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[48] at map at <console>:25

scala> rdd3.take(2).foreach(println)

{"cl":"js","ua":"Mozilla/5.0 (Windows NT 6.1; rv:26.0) Gecko/20100101 Firefox/26.0","ip":"","cc":"US","rg":"WA","ct":"Everett","pc":"98208","mc":819,"bf":"cd21df8fdaa6eee6b8af906ab1345fe6ce797ad1","vst":"56941a03-b0bd-4f93-912c-f90ba0dc4159","lt":"Sun Jan 19 23:59:39 -0800 2014"}, {"v":"1.1","e":"block","et":"keyword","t":"10957643","u":"","pu":"","bk":"nude","pv":"98580188-4abd-4a1e-ae98-6e21be2d2c5d"}
{"cl":"js","ua":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36","ip":"","cc":"US","rg":"IL","ct":"Chicago","pc":"60616","mc":602,"bf":"ec8498f87549a798e738fce5b312a791f1a4d683","vst":"761df6d3-8747-4939-b5f9-f2ecdb95b419","lt":"Mon Jan 20 02:00:00 -0600 2014"}, {"v":"1.1","e":"block","et":"keyword","t":"e0b5822c","u":"","pu":"","bk":"sex photos","pv":"8a529597-a4f0-45e8-9e7e-ef5ccb1aa169"}

// our json single line has {},{} -- but it reads just first set {}. 2nd set {} is ignored.

scala> val df1 =

warning: there was one deprecation warning; re-run with -deprecation for details

df1: org.apache.spark.sql.DataFrame = [bf: string, cc: string ... 9 more fields]
// this schema belongs to 1st set {}

scala> df1.printSchema
 |-- bf: string (nullable = true)
 |-- cc: string (nullable = true)
 |-- cl: string (nullable = true)
 |-- ct: string (nullable = true)
 |-- ip: string (nullable = true)
 |-- lt: string (nullable = true)
 |-- mc: long (nullable = true)
 |-- pc: string (nullable = true)
 |-- rg: string (nullable = true)
 |-- ua: string (nullable = true)
 |-- vst: string (nullable = true)
// partial data

scala> df1.columns.size

res31: Int = 11

// Here we merge 2 sets
scala> val rdd3 = => x(1)).map(x => x.replace("}, {",","))

rdd3: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[56] at map at <console>:25

scala> rdd3.take(2).foreach(println)

{"cl":"js","ua":"Mozilla/5.0 (Windows NT 6.1; rv:26.0) Gecko/20100101 Firefox/26.0","ip":"","cc":"US","rg":"WA","ct":"Everett","pc":"98208","mc":819,"bf":"cd21df8fdaa6eee6b8af906ab1345fe6ce797ad1","vst":"56941a03-b0bd-4f93-912c-f90ba0dc4159","lt":"Sun Jan 19 23:59:39 -0800 2014","v":"1.1","e":"block","et":"keyword","t":"10957643","u":"","pu":"","bk":"nude","pv":"98580188-4abd-4a1e-ae98-6e21be2d2c5d"}
{"cl":"js","ua":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36","ip":"","cc":"US","rg":"IL","ct":"Chicago","pc":"60616","mc":602,"bf":"ec8498f87549a798e738fce5b312a791f1a4d683","vst":"761df6d3-8747-4939-b5f9-f2ecdb95b419","lt":"Mon Jan 20 02:00:00 -0600 2014","v":"1.1","e":"block","et":"keyword","t":"e0b5822c","u":"","pu":"","bk":"sex photos","pv":"8a529597-a4f0-45e8-9e7e-ef5ccb1aa169"}

// Here we included both the sets
scala> val df1 =

warning: there was one deprecation warning; re-run with -deprecation for details

df1: org.apache.spark.sql.DataFrame = [ab: bigint, af: boolean ... 32 more fields]

// this is perfect and we have got all the columns

scala> df1.printSchema
 |-- ab: long (nullable = true)
 |-- af: boolean (nullable = true)
 |-- ai: string (nullable = true)
 |-- bf: string (nullable = true)
 |-- bk: string (nullable = true)
 |-- cc: string (nullable = true)
 |-- cid: long (nullable = true)
 |-- cl: string (nullable = true)
 |-- cre: string (nullable = true)
 |-- ct: string (nullable = true)
 |-- du: string (nullable = true)
 |-- e: string (nullable = true)
 |-- et: string (nullable = true)
 |-- ip: string (nullable = true)
 |-- lt: string (nullable = true)
 |-- mc: long (nullable = true)
 |-- nc: double (nullable = true)
 |-- pc: string (nullable = true)
 |-- pc: double (nullable = true)
 |-- pu: string (nullable = true)
 |-- pv: string (nullable = true)
 |-- rg: string (nullable = true)
 |-- rpc: double (nullable = true)
 |-- rpm: double (nullable = true)
 |-- rpv: double (nullable = true)
 |-- seq: long (nullable = true)
 |-- t: string (nullable = true)
 |-- tr: double (nullable = true)
 |-- u: string (nullable = true)
 |-- ua: string (nullable = true)
 |-- uid: long (nullable = true)
 |-- v: string (nullable = true)
 |-- vst: string (nullable = true)
 |-- vtw: long (nullable = true)
scala> df1.columns.size
res35: Int = 34

// drop a column named "pc"

scala> val df2 = df1.drop("pc")

df2: org.apache.spark.sql.DataFrame = [ab: bigint, af: boolean ... 30 more fields]

scala> df2.printSchema
 |-- ab: long (nullable = true)
 |-- af: boolean (nullable = true)
 |-- ai: string (nullable = true)
 |-- bf: string (nullable = true)
 |-- bk: string (nullable = true)
 |-- cc: string (nullable = true)
 |-- cid: long (nullable = true)
 |-- cl: string (nullable = true)
 |-- cre: string (nullable = true)
 |-- ct: string (nullable = true)
 |-- du: string (nullable = true)
 |-- e: string (nullable = true)
 |-- et: string (nullable = true)
 |-- ip: string (nullable = true)
 |-- lt: string (nullable = true)
 |-- mc: long (nullable = true)
 |-- nc: double (nullable = true)
 |-- pu: string (nullable = true)
 |-- pv: string (nullable = true)
 |-- rg: string (nullable = true)
 |-- rpc: double (nullable = true)
 |-- rpm: double (nullable = true)
 |-- rpv: double (nullable = true)
 |-- seq: long (nullable = true)
 |-- t: string (nullable = true)
 |-- tr: double (nullable = true)
 |-- u: string (nullable = true)
 |-- ua: string (nullable = true)
 |-- uid: long (nullable = true)
 |-- v: string (nullable = true)
 |-- vst: string (nullable = true)
 |-- vtw: long (nullable = true)

scala> df2.write.mode("append").saveAsTable("default.jsondf")
scala> df2.createOrReplaceTempView("test")

scala> spark.sql("select * from test").show(10)

scala> spark.sql("select * from default.jsonexa").show

