Machine Learning & Big Data Blog

Spark ElasticSearch Hadoop Update and Upsert Example and Explanation

Walker Rowe
by Walker Rowe

Here we explain how to write Python to code to update an ElasticSearch document from an Apache Spark Dataframe and RDD.

There are few instructions on the internet. Those written by ElasticSearch are difficult to understand and offer no examples. So we make the simplest possible example here.

This code adds additional fields to an ElasticSearch (ES) JSON document. i.e. it updates the document. Spark has built-in native support for Scala and Java. But for Python you have to use the Elasticsearch-Hadoop connector, written by ElasticSearch. That makes this operation more complicated.

Code on Github

The code for this exercise is here:

Update ElasticSearch
Run code with spark-submit
Create Data

Prerequisites

  • ES. Download the binary and do not use apt-get install as the version stored there is too old.
  • Apache Spark.
  • Hadoop-ElasticSearch jar file. When you download it from here, it will provide jars for various languages.

Add Data

First we need to add two data records to ES. The key is the notation: school/doc/1

Means to add this to the index school, type doc, with id = 1.

curl -XPUT --header 'Content-Type: application/json' http://localhost:9200/school/doc/1 -d '{
   "school" : "Clemson"                 
}'

curl -XPUT --header 'Content-Type: application/json' http://localhost:9200/schools/doc/2 -d '{
   "school" : "Harvard"			
}'

Here is the code to run the python code below as a spark-submit job. You do not need this to step through the code one line at a time with pyspark.

#!/bin/bash

NUM_CORES=*
DRIVER_MEM=3g

JARS="/usr/share/spark/spark-2.3.2-bin-hadoop2.7/jars/elasticsearch-hadoop-6.4.2/dist"

export SPARK_HOME="/usr/share/spark/spark-2.3.2-bin-hadoop2.7"
export PATH=$PATH:$SPARK_HOME/bin
export CODEPATH="/home/ubuntu/Documents/esearch"

cd $CODEPATH

$SPARK_HOME/bin/spark-submit --master local[$NUM_CORES] --driver-memory
$DRIVER_MEM --jars $JARS/elasticsearch-hadoop-6.4.2.jar
$CODEPATH/updateData.py

Code Explained

read.format opens a connection to ES. The important item to note is “es.read.metadata”, “true”. We need the metadata as that provides the document _id that we will need to do the update operation

reader = spark.read.format("org.elasticsearch.spark.sql").option("es.read.metadata", "true").option("es.nodes.wan.only","true").option("es.port","9200").option("es.net.ssl","false").option("es.nodes", "http://localhost")

Next we read all the documents in the index school.

df = reader.load("school")
df.show()

Now we filter to find the one record that is equal to Harvard.

df.filter(df["school"] == "Harvard").show()

Now that the df.filter() operation works in situ. In other it does not return a new dataframe. Instead if operates on the current dataframe. You can see that below. Only Harvard is shown after the filter operation.

df.show()
+---------+-------+--------------------+
|  school|           _metadata|
+---------+-------+--------------------+
| Harvard|[_index -> school...|
| Clemson|[_index -> school...|
+---------+-------+--------------------+

df.filter(df["school"] == "Harvard").show()
+---------+-------+--------------------+
|   school|           _metadata|
+---------+-------+--------------------+
| Harvard|[_index -> school...|
+---------+-------+--------------------+

We need to retrieve the _id from the _metadata, which is a Map. So we turn this into a RDD then use the collect() method to make a Row object. Then we read the first row, second column by index, then retrieve the _id by name. Then we use df.withColumn and lit to write that value as a new column with a constant value into the dataframe df.

Note: there is only one row in the dataframe. If there there more then we would have to perform a map operation on the rest of the code below to update all the records in the dataframe.

r=df.rdd.collect()
id = r[0][1]['_id']
df2=df.withColumn("_id", lit(id)) 

Next we create the esconf object as a dictionary. In code examples on the internet you will see this as JSON. But the notation below is cleaner and easier to work with.

The import items to note are:

es.update.script.inline ctx._source.location means to update or create a field called location. ctx_source is the ES object to do that.
es.update.script.params location:<Cambridge> are the parameter values passed to the inline script es.update.script.inline. The <> means to write a literal. If we wanted to write a field value we would leave them off.
es.mapping.id This tells ES to look in the dataframe for the id column and use that as the document ID _id. ES uses that to find the document we want to update in ES.
es.write.operation upsert means to add the document if it does not exist, otherwise update it. update means to update it.
esconf={}
esconf["es.mapping.id"] = "_id"
esconf["es.nodes"] = "localhost"
esconf["es.port"] = "9200"
esconf["es.update.script.inline"] = "ctx._source.location = params.location"
esconf["es.update.script.params"] = "location:"
esconf["es.write.operation"] = "upsert"

This writes the data. The **esconf means to read the dictionary escconf. The mode(“append”) means to add the fields to the existing document.

df2.write.format("org.elasticsearch.spark.sql").options(**esconf).mode("append").save("school/info")

Now we look up the document and notice that location field has been updated to Cambridge. Bunch of Ivy league snobs.

curl -X GET 'http://localhost:9chool/info/_search'

{"took":1,"timed_out":false,"_shards":{"total":5,"successful":5,"skipped":0,"failed":0},"hits":{"total":1,"max_score":1.0,"hits":[{"_index":"school","_type":"info","_id":"2","_score":1.0,"_source":{"school":"Harvard","location":"Cambridge"}}]}}

Complete Code

from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.sql.functions import lit

conf = SparkConf().setAppName("updateSchools")
sc = SparkContext(conf=conf)
sc.setLogLevel("INFO")
spark = SQLContext(sc)

reader = spark.read.format("org.elasticsearch.spark.sql").option("es.read.metadata", "true").option("es.nodes.wan.only","true").option("es.port","9200").option("es.net.ssl","false").option("es.nodes", "http://localhost")

df = reader.load("school")
df.show()

df.filter(df["school"] == "Harvard").show()

r=df.rdd.collect()
id = r[0][1]['_id']

df2=df.withColumn("_id", lit(id)) 

esconf={}
esconf["es.mapping.id"] = "_id"
esconf["es.nodes"] = "localhost"
esconf["es.port"] = "9200"
esconf["es.update.script.inline"] = "ctx._source.location = params.location"
esconf["es.update.script.params"] = "location:"
esconf["es.write.operation"] = "upsert"

df2.write.format("org.elasticsearch.spark.sql").options(**esconf).mode("append").save("school/info")

Wikibon: Automate your Big Data pipeline

Learn how data management experts throughout the industry are transforming their Big Data infrastructure for maximum business impact.
Download Now ›

These postings are my own and do not necessarily represent BMC's position, strategies, or opinion.

About the author

Walker Rowe

Walker Rowe

Walker Rowe is an American freelance tech writer and programmer living in Tunisia. He specializes in big data, analytics, and programming languages. Find him on LinkedIn or Upwork.