Have you tried to make Spark and Elasticsearch play well together but run into snags? You’re not alone. On StackOverflow there are over 500 questions about integrating Spark and Elasticsearch. This post walks through how to do this seemlessly. We’ll focus on doing this with PySpark as opposed to Spark’s other APIs (Java, Scala, etc.).
The code for this guide is on Github. Feel free to follow along!
Elasticsearch-Hadoop
First, you need to ensure that the Elasticsearch-Hadoop connector library is installed across your Spark cluster. It must be available at the same path on each of your nodes. You can install it with something like:
wget http://download.elastic.co/hadoop/elasticsearch-hadoop-6.1.1.zip
unzip elasticsearch-hadoop-6.1.1.zip
Now, when you run spark-submit
ensure that the elasticsearch-hadoop
jar is specified:
spark-submit --jars /path_to/elasticsearch-hadoop-6.1.1/dist/elasticsearch-spark-20_2.11-6.1.1.jar
Unfortunately, there is no pip install
‘ing elasticsearch-hadoop
; you must have the binary available and the jar path specified for Spark. If you are running a Jupyter Notebook you can add the jar to the environment
prior to establishing a Spark Context with:
import os
# set environment variable PYSPARK_SUBMIT_ARGS
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars elasticsearch-hadoop-6.1.1/dist/elasticsearch-spark-20_2.11-6.1.1.jar pyspark-shell'
# invoke SparkContext (using environment variable)
sc = SparkContext(appName="PythonSparkStreaming")
Now, our SparkContext sc
has access to the elasticsearch-hadoop
library. Bingo!
Writing to Elasticsearch
Specifying a configuration
To write an RDD to Elasticsearch you need to first specify a configuration. This includes the location of the Elasticsearch cluster’s Master Node, it’s port, and the index
and document type
you are writing to. There are numerous things that can be configured prior to writing to Elasticsearch but we’ll save those for later. Your conf
should look something like:
es_write_conf = {
# specify the node that we are sending data to (this should be the master)
"es.nodes" : 'localhost',
# specify the port in case it is not the default port
"es.port" : '9200',
# specify a resource in the form 'index/doc-type'
"es.resource" : 'testindex/testdoc',
# is the input JSON?
"es.input.json" : "yes",
# is there a field in the mapping that should be used to specify the ES document ID
"es.mapping.id": "doc_id"
}
We have indicated that our Elasticsearch Master is at localhost:9200
, we are writing to the testindex
index with testdoc
document type. We plan to write JSON
and there is a field called doc_id
in the JSON
within our RDD
which we wish to use for the Elasticsearch document id
.
Prepping the data
Now, we need to ensure that our RDD has records of the type:
(0, "{'some_key': 'some_value', 'doc_id': 123}")
Note that we have an RDD of tuples. The first value in the tuple (the key) actually doesn’t matter. elasticsearch-hadoop
only cares about the value. You need to ensure that the value is a valid JSON
string. So, you could try starting with data like:
data = [
{'some_key': 'some_value', 'doc_id': 123},
{'some_key': 'some_value', 'doc_id': 456},
{'some_key': 'some_value', 'doc_id': 789}
]
Now we can create an RDD from this:
rdd = sc.parallelize(data)
Now we can define a function to format this into an elasticsearch-hadoop
compatible RDD:
def format_data(x):
return (data['doc_id'], json.dumps(data))
rdd = rdd.map(lambda x: format_data(x))
We have just set the doc_id
to the first value in the tuple and the second value to be the JSON
encoded string of the dictionary we began with. If we pass a Python dictionary to elasticsearch-hadoop
it will puke. Make sure you’ve run json.dumps()
on that record!
Executing the Write Operation
Now we’re in business. You just need to execute the following action
on your RDD
to write it to Elasticsearch:
rdd.saveAsNewAPIHadoopFile(
path='-',
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
# critically, we must specify our `es_write_conf`
conf=es_write_conf)
Reading from Elasticsearch
Now that we have some data in Elasticsearch, we can read it back in using the elasticsearc-hadoop
connector.
es_read_conf = {
# specify the node that we are sending data to (this should be the master)
"es.nodes" : 'localhost',
# specify the port in case it is not the default port
"es.port" : '9200',
# specify a resource in the form 'index/doc-type'
"es.resource" : 'testindex/testdoc'
}
We’re simply telling Spark to use the same Elasticsearch Master Node and resource set as the one we just wrote to. Now we can generate an RDD from this Elasticsearch data:
es_rdd = sc.newAPIHadoopRDD(
inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=es_read_conf)
To test that everything worked, try running es_rdd.take(1)
on the RDD
. You should be returned something like (123, "{'some_key': 'some_value', 'doc_id': 123}")
.
Final Thoughts
The mechanics of reading and writing from Spark to Elasticseach can be confusing. Even more confusing are the performance tuning aspects of this. Spark is extremely powerful and can easily overwhelm an Elasticsearch cluster. Combined, these technologies power many of the largest tech companies in the world. If you’re interested in learning more about how you can combine Spark and Elasticsearch or are facing issues with speed or reliability with your instantiation let us know!