Spark with MongoDB - Read / Write Operations
$ mongo // start CLI
MongoDB shell version v3.6.3
connecting to: mongodb://127.0.0.1:27017
MongoDB server version: 3.6.3
// Adding 2 records in person
> db.person.insert([{
"id":100,
"name":"Sankara",
"salary":3000,
"city":"Pallathur"
},
{
"id":101,
"name":"Rasee",
"salary":3100,
"city":"Kanadukathan"
}])
BulkWriteResult({
"writeErrors" : [ ],
"writeConcernErrors" : [ ],
"nInserted" : 2,
"nUpserted" : 0,
"nMatched" : 0,
"nModified" : 0,
"nRemoved" : 0,
"upserted" : [ ]
})
// select
> db.person.find()
{ "_id" : ObjectId("5c780db0d44e3fc2ebd26d43"), "id" : 100, "name" : "Sankara", "salary" : 3000, "city" : "Pallathur" }
{ "_id" : ObjectId("5c780db0d44e3fc2ebd26d44"), "id" : 101, "name" : "Rasee", "salary" : 3100, "city" : "Kanadukathan" }
// beautify
> db.person.find().pretty()
{
"_id" : ObjectId("5c780db0d44e3fc2ebd26d43"),
"id" : 100,
"name" : "Sankara",
"salary" : 3000,
"city" : "Pallathur"
}
{
"_id" : ObjectId("5c780db0d44e3fc2ebd26d44"),
"id" : 101,
"name" : "Rasee",
"salary" : 3100,
"city" : "Kanadukathan"
}
//search with condition
> db.person.find({"name":"Sankara"})
{ "_id" : ObjectId("5c780db0d44e3fc2ebd26d43"), "id" : 100, "name" : "Sankara", "salary" : 3000, "city" : "Pallathur" }
> db.person.update({"id":100},{$set:{"name":"Arun"}})
WriteResult({ "nMatched" : 1, "nUpserted" : 0, "nModified" : 1 })
{ "_id" : ObjectId("5c799a6092b6d263e5f35ba8"), "id" : 100, "name" : "Arun", "salary" : 3000, "city" : "Pallathur" }
{ "_id" : ObjectId("5c799a6092b6d263e5f35ba9"), "id" : 101, "name" : "Rasee", "salary" : 3100, "city" : "Kanadukathan" }
{ "_id" : ObjectId("5c79a09765e80e21e2e727ce"), "id" : 500, "name" : "Sathya", "city" : "Bangalore", "salary" : 3000 }
{ "_id" : ObjectId("5c79a09765e80e21e2e727cf"), "id" : 501, "name" : "Lakshmi", "city" : "Chennai", "salary" : 3100 }
{ "_id" : ObjectId("5c79a09765e80e21e2e727d0"), "id" : 502, "name" : "Kalai", "city" : "Perai", "salary" : 3200 }
// Spark starts here
spark-shell --packages org.mongodb.spark:mongo-spark-connector_2.11:2.4.0
import com.mongodb.spark._
import com.mongodb.spark.config.ReadConfig
import com.mongodb.spark.sql._
import org.bson.Document
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().appName("MongoPerson").master("local[*]").getOrCreate()
val readConfig = ReadConfig(Map("uri" -> "mongodb://127.0.0.1/", "database" -> "test", "collection" -> "person"))
val df = spark.read.mongo(readConfig) // test.person content populated here
scala> df.show
+--------------------+------------+-----+-------+------+
| _id| city| id| name|salary|
+--------------------+------------+-----+-------+------+
|[5c780db0d44e3fc2...| Pallathur|100.0|Sankara|3000.0|
|[5c780db0d44e3fc2...|Kanadukathan|101.0| Rasee|3100.0|
+--------------------+------------+-----+-------+------+
scala> df.select("city","id","name","salary").show
+------------+-----+-------+------+
| city| id| name|salary|
+------------+-----+-------+------+
| Pallathur|100.0|Sankara|3000.0|
|Kanadukathan|101.0| Rasee|3100.0|
+------------+-----+-------+------+
scala> df.createOrReplaceTempView("tblPerson") // SparkSQL
scala> spark.sql("select * from tblPerson").show
+--------------------+------------+-----+-------+------+
| _id| city| id| name|salary|
+--------------------+------------+-----+-------+------+
|[5c780db0d44e3fc2...| Pallathur|100.0|Sankara|3000.0|
|[5c780db0d44e3fc2...|Kanadukathan|101.0| Rasee|3100.0|
+--------------------+------------+-----+-------+------+
scala> spark.sql("select id,name,city,salary from tblPerson").filter("salary==3000").show
+-----+-------+---------+------+
| id| name| city|salary|
+-----+-------+---------+------+
|100.0|Sankara|Pallathur|3000.0|
+-----+-------+---------+------+
// Making In-memory collection in Spark
scala> case class Person(id:Int, name:String, city:String, salary:Int)
defined class Person
scala> val ob1 = new Person(500,"Sathya","Bangalore",3000)
ob1: Person = Person(500,Sathya,Bangalore,3000)
scala> val ob2 = new Person(501,"Lakshmi","Chennai",3100)
ob2: Person = Person(501,Lakshmi,Chennai,3100)
scala> val ob3 = new Person(502,"Kalai","Perai",3200)
ob3: Person = Person(502,Kalai,Perai,3200)
scala> val r1 = sc.parallelize(List(ob1,ob2,ob3))
r1: org.apache.spark.rdd.RDD[Person] = ParallelCollectionRDD[40] at parallelize at <console>:40
scala> r1.collect.foreach(println)
Person(500,Sathya,Bangalore,3000)
Person(501,Lakshmi,Chennai,3100)
Person(502,Kalai,Perai,3200)
// Converting inmemory collection into Dataframe
scala> val dfCollection = r1.toDF
dfCollection: org.apache.spark.sql.DataFrame = [id: int, name: string ... 2 more fields]
scala> dfCollection.printSchema
root
|-- id: integer (nullable = false)
|-- name: string (nullable = true)
|-- city: string (nullable = true)
|-- salary: integer (nullable = false)
scala> dfCollection.show
+---+-------+---------+------+
| id| name| city|salary|
+---+-------+---------+------+
|500| Sathya|Bangalore| 3000|
|501|Lakshmi| Chennai| 3100|
|502| Kalai| Perai| 3200|
+---+-------+---------+------+
// Going to write in-memory collection into Mongo
scala> import com.mongodb.spark.config.{ReadConfig, WriteConfig}
import com.mongodb.spark.config.{ReadConfig, WriteConfig}
// Writer Configuration : test.person
scala> val writeConfig = WriteConfig(Map("uri" -> "mongodb://127.0.0.1/test.person"))
writeConfig: com.mongodb.spark.config.WriteConfig.Self = WriteConfig(test,person,Some(mongodb://127.0.0.1/test.person),true,512,15,WriteConcernConfig(None,None,None,None),None,false,true)
scala> MongoSpark.save(dfCollection.write.mode("append"), writeConfig)
// Back to Mongo
db.person.find().pretty()
{
"_id" : ObjectId("5c7815d1d44e3fc2ebd26d47"),
"id" : 100,
"name" : "Sankara",
"salary" : 3000,
"city" : "Pallathur"
}
{
"_id" : ObjectId("5c7815d1d44e3fc2ebd26d48"),
"id" : 101,
"name" : "Rasee",
"salary" : 3100,
"city" : "Kanadukathan"
}
{
"_id" : ObjectId("5c7816f165e80e7efcae207d"),
"id" : 500,
"name" : "Sathya",
"city" : "Bangalore",
"salary" : 3000
}
{
"_id" : ObjectId("5c7816f165e80e7efcae207e"),
"id" : 501,
"name" : "Lakshmi",
"city" : "Chennai",
"salary" : 3100
}
{
"_id" : ObjectId("5c7816f165e80e7efcae207f"),
"id" : 502,
"name" : "Kalai",
"city" : "Perai",
"salary" : 3200
}
// Back to Spark
scala> df.show
+--------------------+------------+---+-------+------+
| _id| city| id| name|salary|
+--------------------+------------+---+-------+------+
|[5c7815d1d44e3fc2...| Pallathur|100|Sankara| 3000|
|[5c7815d1d44e3fc2...|Kanadukathan|101| Rasee| 3100|
|[5c7816f165e80e7e...| Bangalore|500| Sathya| 3000|
|[5c7816f165e80e7e...| Chennai|501|Lakshmi| 3100|
|[5c7816f165e80e7e...| Perai|502| Kalai| 3200|
+--------------------+------------+---+-------+------+
// Dropping in Mongo
>db.person.drop()
true
// Dataframe is empty
scala> df.show
+---+----+---+----+------+
|_id|city| id|name|salary|
+---+----+---+----+------+
+---+----+---+----+------+
//Create new DB
use Retail
db.retail.person2.insert([{"name":"Sankara","id":101}])
db.retail.person2.find()
MongoDB shell version v3.6.3
connecting to: mongodb://127.0.0.1:27017
MongoDB server version: 3.6.3
// Adding 2 records in person
> db.person.insert([{
"id":100,
"name":"Sankara",
"salary":3000,
"city":"Pallathur"
},
{
"id":101,
"name":"Rasee",
"salary":3100,
"city":"Kanadukathan"
}])
BulkWriteResult({
"writeErrors" : [ ],
"writeConcernErrors" : [ ],
"nInserted" : 2,
"nUpserted" : 0,
"nMatched" : 0,
"nModified" : 0,
"nRemoved" : 0,
"upserted" : [ ]
})
// select
> db.person.find()
{ "_id" : ObjectId("5c780db0d44e3fc2ebd26d43"), "id" : 100, "name" : "Sankara", "salary" : 3000, "city" : "Pallathur" }
{ "_id" : ObjectId("5c780db0d44e3fc2ebd26d44"), "id" : 101, "name" : "Rasee", "salary" : 3100, "city" : "Kanadukathan" }
// beautify
> db.person.find().pretty()
{
"_id" : ObjectId("5c780db0d44e3fc2ebd26d43"),
"id" : 100,
"name" : "Sankara",
"salary" : 3000,
"city" : "Pallathur"
}
{
"_id" : ObjectId("5c780db0d44e3fc2ebd26d44"),
"id" : 101,
"name" : "Rasee",
"salary" : 3100,
"city" : "Kanadukathan"
}
//search with condition
> db.person.find({"name":"Sankara"})
{ "_id" : ObjectId("5c780db0d44e3fc2ebd26d43"), "id" : 100, "name" : "Sankara", "salary" : 3000, "city" : "Pallathur" }
//AND and OR conditions
db.person.find({$and:[{"name":"Sankara"},{"id":101}]})
> db.person.find({$or:[{"name":"Sankara"},{"id":101}]})
> db.person.find({$or:[{"name":"Sankara"},{"id":101}]})
{ "_id" : ObjectId("5c799a6092b6d263e5f35ba8"), "id" : 100, "name" : "Sankara", "salary" : 3000, "city" : "Pallathur" }
{ "_id" : ObjectId("5c799a6092b6d263e5f35ba9"), "id" : 101, "name" : "Rasee", "salary" : 3100, "city" : "Kanadukathan" }
>
{ "_id" : ObjectId("5c799a6092b6d263e5f35ba9"), "id" : 101, "name" : "Rasee", "salary" : 3100, "city" : "Kanadukathan" }
>
//How to update the record in mongoDB
db.person.find()
{ "_id" : ObjectId("5c799a6092b6d263e5f35ba8"), "id" : 100, "name" : "Sankara", "salary" : 3000, "city" : "Pallathur" }
{ "_id" : ObjectId("5c799a6092b6d263e5f35ba9"), "id" : 101, "name" : "Rasee", "salary" : 3100, "city" : "Kanadukathan" }
{ "_id" : ObjectId("5c79a09765e80e21e2e727ce"), "id" : 500, "name" : "Sathya", "city" : "Bangalore", "salary" : 3000 }
{ "_id" : ObjectId("5c79a09765e80e21e2e727cf"), "id" : 501, "name" : "Lakshmi", "city" : "Chennai", "salary" : 3100 }
{ "_id" : ObjectId("5c79a09765e80e21e2e727d0"), "id" : 502, "name" : "Kalai", "city" : "Perai", "salary" : 3200 }
{ "_id" : ObjectId("5c799a6092b6d263e5f35ba8"), "id" : 100, "name" : "Sankara", "salary" : 3000, "city" : "Pallathur" }
{ "_id" : ObjectId("5c799a6092b6d263e5f35ba9"), "id" : 101, "name" : "Rasee", "salary" : 3100, "city" : "Kanadukathan" }
{ "_id" : ObjectId("5c79a09765e80e21e2e727ce"), "id" : 500, "name" : "Sathya", "city" : "Bangalore", "salary" : 3000 }
{ "_id" : ObjectId("5c79a09765e80e21e2e727cf"), "id" : 501, "name" : "Lakshmi", "city" : "Chennai", "salary" : 3100 }
{ "_id" : ObjectId("5c79a09765e80e21e2e727d0"), "id" : 502, "name" : "Kalai", "city" : "Perai", "salary" : 3200 }
> db.person.update({"id":100},{$set:{"name":"Arun"}})
WriteResult({ "nMatched" : 1, "nUpserted" : 0, "nModified" : 1 })
> db.person.find()
{ "_id" : ObjectId("5c799a6092b6d263e5f35ba9"), "id" : 101, "name" : "Rasee", "salary" : 3100, "city" : "Kanadukathan" }
{ "_id" : ObjectId("5c79a09765e80e21e2e727ce"), "id" : 500, "name" : "Sathya", "city" : "Bangalore", "salary" : 3000 }
{ "_id" : ObjectId("5c79a09765e80e21e2e727cf"), "id" : 501, "name" : "Lakshmi", "city" : "Chennai", "salary" : 3100 }
{ "_id" : ObjectId("5c79a09765e80e21e2e727d0"), "id" : 502, "name" : "Kalai", "city" : "Perai", "salary" : 3200 }
// Spark starts here
spark-shell --packages org.mongodb.spark:mongo-spark-connector_2.11:2.4.0
import com.mongodb.spark._
import com.mongodb.spark.config.ReadConfig
import com.mongodb.spark.sql._
import org.bson.Document
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().appName("MongoPerson").master("local[*]").getOrCreate()
val readConfig = ReadConfig(Map("uri" -> "mongodb://127.0.0.1/", "database" -> "test", "collection" -> "person"))
val df = spark.read.mongo(readConfig) // test.person content populated here
scala> df.show
+--------------------+------------+-----+-------+------+
| _id| city| id| name|salary|
+--------------------+------------+-----+-------+------+
|[5c780db0d44e3fc2...| Pallathur|100.0|Sankara|3000.0|
|[5c780db0d44e3fc2...|Kanadukathan|101.0| Rasee|3100.0|
+--------------------+------------+-----+-------+------+
scala> df.select("city","id","name","salary").show
+------------+-----+-------+------+
| city| id| name|salary|
+------------+-----+-------+------+
| Pallathur|100.0|Sankara|3000.0|
|Kanadukathan|101.0| Rasee|3100.0|
+------------+-----+-------+------+
scala> df.createOrReplaceTempView("tblPerson") // SparkSQL
scala> spark.sql("select * from tblPerson").show
+--------------------+------------+-----+-------+------+
| _id| city| id| name|salary|
+--------------------+------------+-----+-------+------+
|[5c780db0d44e3fc2...| Pallathur|100.0|Sankara|3000.0|
|[5c780db0d44e3fc2...|Kanadukathan|101.0| Rasee|3100.0|
+--------------------+------------+-----+-------+------+
scala> spark.sql("select id,name,city,salary from tblPerson").filter("salary==3000").show
+-----+-------+---------+------+
| id| name| city|salary|
+-----+-------+---------+------+
|100.0|Sankara|Pallathur|3000.0|
+-----+-------+---------+------+
// Making In-memory collection in Spark
scala> case class Person(id:Int, name:String, city:String, salary:Int)
defined class Person
scala> val ob1 = new Person(500,"Sathya","Bangalore",3000)
ob1: Person = Person(500,Sathya,Bangalore,3000)
scala> val ob2 = new Person(501,"Lakshmi","Chennai",3100)
ob2: Person = Person(501,Lakshmi,Chennai,3100)
scala> val ob3 = new Person(502,"Kalai","Perai",3200)
ob3: Person = Person(502,Kalai,Perai,3200)
scala> val r1 = sc.parallelize(List(ob1,ob2,ob3))
r1: org.apache.spark.rdd.RDD[Person] = ParallelCollectionRDD[40] at parallelize at <console>:40
scala> r1.collect.foreach(println)
Person(500,Sathya,Bangalore,3000)
Person(501,Lakshmi,Chennai,3100)
Person(502,Kalai,Perai,3200)
// Converting inmemory collection into Dataframe
scala> val dfCollection = r1.toDF
dfCollection: org.apache.spark.sql.DataFrame = [id: int, name: string ... 2 more fields]
scala> dfCollection.printSchema
root
|-- id: integer (nullable = false)
|-- name: string (nullable = true)
|-- city: string (nullable = true)
|-- salary: integer (nullable = false)
scala> dfCollection.show
+---+-------+---------+------+
| id| name| city|salary|
+---+-------+---------+------+
|500| Sathya|Bangalore| 3000|
|501|Lakshmi| Chennai| 3100|
|502| Kalai| Perai| 3200|
+---+-------+---------+------+
// Going to write in-memory collection into Mongo
scala> import com.mongodb.spark.config.{ReadConfig, WriteConfig}
import com.mongodb.spark.config.{ReadConfig, WriteConfig}
// Writer Configuration : test.person
scala> val writeConfig = WriteConfig(Map("uri" -> "mongodb://127.0.0.1/test.person"))
writeConfig: com.mongodb.spark.config.WriteConfig.Self = WriteConfig(test,person,Some(mongodb://127.0.0.1/test.person),true,512,15,WriteConcernConfig(None,None,None,None),None,false,true)
scala> MongoSpark.save(dfCollection.write.mode("append"), writeConfig)
// Back to Mongo
db.person.find().pretty()
{
"_id" : ObjectId("5c7815d1d44e3fc2ebd26d47"),
"id" : 100,
"name" : "Sankara",
"salary" : 3000,
"city" : "Pallathur"
}
{
"_id" : ObjectId("5c7815d1d44e3fc2ebd26d48"),
"id" : 101,
"name" : "Rasee",
"salary" : 3100,
"city" : "Kanadukathan"
}
{
"_id" : ObjectId("5c7816f165e80e7efcae207d"),
"id" : 500,
"name" : "Sathya",
"city" : "Bangalore",
"salary" : 3000
}
{
"_id" : ObjectId("5c7816f165e80e7efcae207e"),
"id" : 501,
"name" : "Lakshmi",
"city" : "Chennai",
"salary" : 3100
}
{
"_id" : ObjectId("5c7816f165e80e7efcae207f"),
"id" : 502,
"name" : "Kalai",
"city" : "Perai",
"salary" : 3200
}
// Back to Spark
scala> df.show
+--------------------+------------+---+-------+------+
| _id| city| id| name|salary|
+--------------------+------------+---+-------+------+
|[5c7815d1d44e3fc2...| Pallathur|100|Sankara| 3000|
|[5c7815d1d44e3fc2...|Kanadukathan|101| Rasee| 3100|
|[5c7816f165e80e7e...| Bangalore|500| Sathya| 3000|
|[5c7816f165e80e7e...| Chennai|501|Lakshmi| 3100|
|[5c7816f165e80e7e...| Perai|502| Kalai| 3200|
+--------------------+------------+---+-------+------+
// Dropping in Mongo
>db.person.drop()
true
// Dataframe is empty
scala> df.show
+---+----+---+----+------+
|_id|city| id|name|salary|
+---+----+---+----+------+
+---+----+---+----+------+
//Create new DB
use Retail
db.retail.person2.insert([{"name":"Sankara","id":101}])
db.retail.person2.find()
No comments:
Post a Comment