Spark and ElasticSearch integration
In this blog, as topic gives a glimpse what it is going to be. Here, I’m going to explain the end to end process of writing and reading data to/from elasticsearch in spark .
Elasticsearch: Elasticsearch is a distributed, open source search and analytics engine, designed for horizontal scalability, reliability, and easy management. It combines the speed of search with the power of analytics via a sophisticated, developer-friendly query language covering structured, unstructured, and time-series data.
Before explaining code, let me provide some configuration detail. I’m assuming at this time you configured elasticsearch on your machine. If not please follow elasticsearch configuration blog.
You will need elasticsearch-hadoop dependency:
<!-- https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch-hadoop --> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch-hadoop</artifactId> <version>2.3.1</version> </dependency>
Make sure your elasticsearch is running : http://localhost:9200/
If your es is up then you will see something like this :
{ name: "Tumbler", cluster_name: "elasticsearch", version: { number: "2.1.1", build_hash: "40e2c53a6b6c2972b3d13846e450e66f4375bd71", build_timestamp: "2015-12-15T13:05:55Z", build_snapshot: false, lucene_version: "5.3.1" }, tagline: "You Know, for Search" }
Check your index and type is exist : in my case index is bank and type is account http://localhost:9200/bank/_search
If index is available then it will show data formatted as json.
Read data from elasticsearch :
As I already mentioned my index is bank and type is account, by using elasticsearch-hadoop we can create RDD using index/type information.
// load elasticsearch index into spark rdd val account_rdd = sparkContext.esRDD("bank/account")
After converting index into RDD we can perform all the function available in RDD.
import org.elasticsearch.spark._ // load elasticsearch index into spark rdd val account_rdd = sparkContext.esRDD("bank/account") // print 10 records of the rdd account_rdd.take(10).foreach(println) // count the records in the rdd account_rdd.count
Write data to elasticsearch:
For testing purpose here I’m providing a map you can load your own data and can use case class. I will cover those in next part.
val sparkContext = new SparkContext(sparkConf) ///start writing to elasticsearch index val name = Map("Kevin" -> 1, "Harris" -> 2, "David" -> 3) val address = Map("chelsea" -> "Otopeni", "NY" -> "San Fran","SD"-> "San Diego") sparkContext.makeRDD(Seq(name, address)).saveToEs("spark/docs")
Complete code :
import org.apache.spark.{ SparkConf, SparkContext } // import elasticsearch packages import org.elasticsearch.spark._ import org.apache.hadoop.conf.Configuration; object SparkESSample { def main(arg: Array[String]) { val sparkConf = new SparkConf().setAppName("ES Spark").setMaster("local[*]") val sparkContext = new SparkContext(sparkConf) ///start writing to elasticsearch index val name = Map("Kevin" -> 1, "Harris" -> 2, "David" -> 3) val address = Map("chelsea" -> "Otopeni", "NY" -> "San Fran", "SD" -> "San Diego") sparkContext.makeRDD(Seq(name, address)).saveToEs("spark/docs") //reading from elasticsearch index // import elasticsearch packages import org.elasticsearch.spark._ // load elasticsearch index into spark rdd val account_rdd = sparkContext.esRDD("bank/account") // print 10 records of the rdd account_rdd.take(10).foreach(println) // count the records in the rdd account_rdd.count } }