Friday, March 1, 2019

Spark with MongoDB - Read / Write Operations

Spark with MongoDB - Read / Write Operations



$ mongo // start CLI

MongoDB shell version v3.6.3
connecting to: mongodb://127.0.0.1:27017
MongoDB server version: 3.6.3

// Adding 2 records in person
> db.person.insert([{
   "id":100,
           "name":"Sankara",
           "salary":3000,
   "city":"Pallathur"
         
        },
        {
           "id":101,
           "name":"Rasee",
           "salary":3100,
   "city":"Kanadukathan"
        }])
BulkWriteResult({
"writeErrors" : [ ],
"writeConcernErrors" : [ ],
"nInserted" : 2,
"nUpserted" : 0,
"nMatched" : 0,
"nModified" : 0,
"nRemoved" : 0,
"upserted" : [ ]
})

// select
> db.person.find()
{ "_id" : ObjectId("5c780db0d44e3fc2ebd26d43"), "id" : 100, "name" : "Sankara", "salary" : 3000, "city" : "Pallathur" }
{ "_id" : ObjectId("5c780db0d44e3fc2ebd26d44"), "id" : 101, "name" : "Rasee", "salary" : 3100, "city" : "Kanadukathan" }

// beautify
> db.person.find().pretty()
{
"_id" : ObjectId("5c780db0d44e3fc2ebd26d43"),
"id" : 100,
"name" : "Sankara",
"salary" : 3000,
"city" : "Pallathur"
}
{
"_id" : ObjectId("5c780db0d44e3fc2ebd26d44"),
"id" : 101,
"name" : "Rasee",
"salary" : 3100,
"city" : "Kanadukathan"
}

//search with condition
> db.person.find({"name":"Sankara"})
{ "_id" : ObjectId("5c780db0d44e3fc2ebd26d43"), "id" : 100, "name" : "Sankara", "salary" : 3000, "city" : "Pallathur" }


//AND and OR conditions

db.person.find({$and:[{"name":"Sankara"},{"id":101}]})
> db.person.find({$or:[{"name":"Sankara"},{"id":101}]})

{ "_id" : ObjectId("5c799a6092b6d263e5f35ba8"), "id" : 100, "name" : "Sankara", "salary" : 3000, "city" : "Pallathur" }
{ "_id" : ObjectId("5c799a6092b6d263e5f35ba9"), "id" : 101, "name" : "Rasee", "salary" : 3100, "city" : "Kanadukathan" }
>

//How to update the record in mongoDB

db.person.find()

{ "_id" : ObjectId("5c799a6092b6d263e5f35ba8"), "id" : 100, "name" : "Sankara", "salary" : 3000, "city" : "Pallathur" }
{ "_id" : ObjectId("5c799a6092b6d263e5f35ba9"), "id" : 101, "name" : "Rasee", "salary" : 3100, "city" : "Kanadukathan" }
{ "_id" : ObjectId("5c79a09765e80e21e2e727ce"), "id" : 500, "name" : "Sathya", "city" : "Bangalore", "salary" : 3000 }
{ "_id" : ObjectId("5c79a09765e80e21e2e727cf"), "id" : 501, "name" : "Lakshmi", "city" : "Chennai", "salary" : 3100 }
{ "_id" : ObjectId("5c79a09765e80e21e2e727d0"), "id" : 502, "name" : "Kalai", "city" : "Perai", "salary" : 3200 }


> db.person.update({"id":100},{$set:{"name":"Arun"}})
WriteResult({ "nMatched" : 1, "nUpserted" : 0, "nModified" : 1 })


> db.person.find()

{ "_id" : ObjectId("5c799a6092b6d263e5f35ba8"), "id" : 100, "name" : "Arun", "salary" : 3000, "city" : "Pallathur" }
{ "_id" : ObjectId("5c799a6092b6d263e5f35ba9"), "id" : 101, "name" : "Rasee", "salary" : 3100, "city" : "Kanadukathan" }
{ "_id" : ObjectId("5c79a09765e80e21e2e727ce"), "id" : 500, "name" : "Sathya", "city" : "Bangalore", "salary" : 3000 }
{ "_id" : ObjectId("5c79a09765e80e21e2e727cf"), "id" : 501, "name" : "Lakshmi", "city" : "Chennai", "salary" : 3100 }
{ "_id" : ObjectId("5c79a09765e80e21e2e727d0"), "id" : 502, "name" : "Kalai", "city" : "Perai", "salary" : 3200 }

// Spark starts here

spark-shell --packages org.mongodb.spark:mongo-spark-connector_2.11:2.4.0
 
import com.mongodb.spark._
import com.mongodb.spark.config.ReadConfig
import com.mongodb.spark.sql._
 import org.bson.Document
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder().appName("MongoPerson").master("local[*]").getOrCreate()
val readConfig = ReadConfig(Map("uri" -> "mongodb://127.0.0.1/", "database" -> "test", "collection" -> "person"))
val df = spark.read.mongo(readConfig)  // test.person content populated here


scala> df.show
+--------------------+------------+-----+-------+------+
|                 _id|        city|   id|   name|salary|
+--------------------+------------+-----+-------+------+
|[5c780db0d44e3fc2...|   Pallathur|100.0|Sankara|3000.0|
|[5c780db0d44e3fc2...|Kanadukathan|101.0|  Rasee|3100.0|
+--------------------+------------+-----+-------+------+


scala> df.select("city","id","name","salary").show
+------------+-----+-------+------+
|        city|   id|   name|salary|
+------------+-----+-------+------+
|   Pallathur|100.0|Sankara|3000.0|
|Kanadukathan|101.0|  Rasee|3100.0|
+------------+-----+-------+------+

scala> df.createOrReplaceTempView("tblPerson") // SparkSQL

scala> spark.sql("select * from tblPerson").show
+--------------------+------------+-----+-------+------+
|                 _id|        city|   id|   name|salary|
+--------------------+------------+-----+-------+------+
|[5c780db0d44e3fc2...|   Pallathur|100.0|Sankara|3000.0|
|[5c780db0d44e3fc2...|Kanadukathan|101.0|  Rasee|3100.0|
+--------------------+------------+-----+-------+------+

scala> spark.sql("select id,name,city,salary from tblPerson").filter("salary==3000").show
+-----+-------+---------+------+
|   id|   name|     city|salary|
+-----+-------+---------+------+
|100.0|Sankara|Pallathur|3000.0|
+-----+-------+---------+------+


// Making In-memory collection in Spark
scala> case class Person(id:Int, name:String, city:String, salary:Int)
defined class Person

scala> val ob1 = new Person(500,"Sathya","Bangalore",3000)
ob1: Person = Person(500,Sathya,Bangalore,3000)

scala> val ob2 = new Person(501,"Lakshmi","Chennai",3100)
ob2: Person = Person(501,Lakshmi,Chennai,3100)

scala> val ob3 = new Person(502,"Kalai","Perai",3200)
ob3: Person = Person(502,Kalai,Perai,3200)

scala> val r1 = sc.parallelize(List(ob1,ob2,ob3))
r1: org.apache.spark.rdd.RDD[Person] = ParallelCollectionRDD[40] at parallelize at <console>:40

scala> r1.collect.foreach(println)
Person(500,Sathya,Bangalore,3000)
Person(501,Lakshmi,Chennai,3100)
Person(502,Kalai,Perai,3200)

// Converting inmemory collection into Dataframe
scala> val dfCollection = r1.toDF
dfCollection: org.apache.spark.sql.DataFrame = [id: int, name: string ... 2 more fields]

scala> dfCollection.printSchema
root
 |-- id: integer (nullable = false)
 |-- name: string (nullable = true)
 |-- city: string (nullable = true)
 |-- salary: integer (nullable = false)


scala> dfCollection.show
+---+-------+---------+------+
| id|   name|     city|salary|
+---+-------+---------+------+
|500| Sathya|Bangalore|  3000|
|501|Lakshmi|  Chennai|  3100|
|502|  Kalai|    Perai|  3200|
+---+-------+---------+------+



// Going to write in-memory collection into Mongo

scala> import com.mongodb.spark.config.{ReadConfig, WriteConfig}
import com.mongodb.spark.config.{ReadConfig, WriteConfig}

// Writer Configuration : test.person
scala> val writeConfig = WriteConfig(Map("uri" -> "mongodb://127.0.0.1/test.person"))
writeConfig: com.mongodb.spark.config.WriteConfig.Self = WriteConfig(test,person,Some(mongodb://127.0.0.1/test.person),true,512,15,WriteConcernConfig(None,None,None,None),None,false,true)

scala> MongoSpark.save(dfCollection.write.mode("append"), writeConfig)


// Back to Mongo
db.person.find().pretty()
{
"_id" : ObjectId("5c7815d1d44e3fc2ebd26d47"),
"id" : 100,
"name" : "Sankara",
"salary" : 3000,
"city" : "Pallathur"
}
{
"_id" : ObjectId("5c7815d1d44e3fc2ebd26d48"),
"id" : 101,
"name" : "Rasee",
"salary" : 3100,
"city" : "Kanadukathan"
}
{
"_id" : ObjectId("5c7816f165e80e7efcae207d"),
"id" : 500,
"name" : "Sathya",
"city" : "Bangalore",
"salary" : 3000
}
{
"_id" : ObjectId("5c7816f165e80e7efcae207e"),
"id" : 501,
"name" : "Lakshmi",
"city" : "Chennai",
"salary" : 3100
}
{
"_id" : ObjectId("5c7816f165e80e7efcae207f"),
"id" : 502,
"name" : "Kalai",
"city" : "Perai",
"salary" : 3200
}

// Back to Spark
scala> df.show
+--------------------+------------+---+-------+------+
|                 _id|        city| id|   name|salary|
+--------------------+------------+---+-------+------+
|[5c7815d1d44e3fc2...|   Pallathur|100|Sankara|  3000|
|[5c7815d1d44e3fc2...|Kanadukathan|101|  Rasee|  3100|
|[5c7816f165e80e7e...|   Bangalore|500| Sathya|  3000|
|[5c7816f165e80e7e...|     Chennai|501|Lakshmi|  3100|
|[5c7816f165e80e7e...|       Perai|502|  Kalai|  3200|
+--------------------+------------+---+-------+------+

// Dropping in Mongo
>db.person.drop()
true

// Dataframe is empty
scala> df.show
+---+----+---+----+------+
|_id|city| id|name|salary|
+---+----+---+----+------+
+---+----+---+----+------+


//Create new DB

use Retail

db.retail.person2.insert([{"name":"Sankara","id":101}])

db.retail.person2.find()



No comments:

Post a Comment

Python Challenges Program

Challenges program: program 1: #Input :ABAABBCA #Output: A4B3C1 str1="ABAABBCA" str2="" d={} for x in str1: d[x]=d...