How to update records in Hive Table that uses ElasticSearch for storage

I had a requirement in which i wanted to update the Hive table. Now Hive is more of append only database and you cant update records in it (That limitation comes from Text files stored in HDFS which is how stores data by default). But if your using Hive with elasticSearch as storage then you can get this to work. When your using ElasticSearch as storage mechanism then every call from hive to insert or delete data gets forwarded to ElasticSearch API, and ElasticSearch has ability to update existing records. I used this to implement the updatable Hive table. So the scenario is lets assume you have a elasticsearch Index that stores First Name, Last Name and Email as document in ElasticSearch. For that create a new index in ES with name equals cricketers and type equals player by making a CURL call like this.

curl -XPOST "http://localhost:9200/cricketers/player/1" -d'
{
    id: "1",
    fname: "First",
    lname: "Cricketer",
    email: "first.cricketer@gmail.com"
}'
This call will first create a Index named cricketers in ES and insert one document in it, with id equals 1. Next step is to define a external table in Hive that uses org.elasticsearch.hadoop.hive.EsStorageHandler as StorageHandler and is pointing to cricketers/player index that you created in last step. Also important setting is 'es.mapping.id'='id' which is saying that use value of id column as primary key/id in elasticsearch.

create external table cricketers_es(id String, fname String, lname String, email String) stored by 'org.elasticsearch.hadoop.hive.EsStorageHandler' 
TBLPROPERTIES('es.resource'='cricketers/player', 'es.index.auto.create'='false', 'es.mapping.id'='id')
Once the table is created you can check records in it by executing select * from cricketers_es command. Now you should see 1 record that is there in the Index.
Since hive does not have concept of update statement. You will have to create a hive table that will have the records that you want to insert/update(Only delta) and then you will use this delta table for updating the cricketers_es table. In order to do that first create a text file that holds delta of the records that you want to update. In my case i did create this simple cricketers.txt file like this and upload into to HDFS at /user/hue folder

1,sachin,tendulkar,sachin.tendulakar@bcci.com
2,Rahul,Dravid,rahul.dravid@bcci.com
After that create a Hive table called cricketers_stage which will be used for holding the delta records you want by executing following statement

create table cricketers_stage(id String, fname String, lname String, email String) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';
Now use following Hive statement to load your delta records into cricketers_stage like this.

LOAD DATA INPATH '/user/hue/cricketers.txt' INTO TABLE `default.cricketers_stage`
Next step ElasticSearch write mode to upsert by setting following property in the Hive console.

es.write.operation=upsert
The last step is to execute following statement in Hive which will take content of cricketers_stage and insert those records into cricketers_es table.

insert into cricketers_es select * from cricketers_stage
Now if you run select * from cricketers_es you should see 2 records your first record is updated and record with id 2 is new insert.

1,sachin,tendulkar,sachin.tendulakar@gmail.com
2,Rahul,Dravid,rahul.dravid@bcci.com
You can also verify the records in elasticsearch by executing following CURL command

curl -XPOST "http://localhost:9200/cricketers/player/_search?pretty=true" -d'
{
  "query": {
      "match_all": {}
  }
}'

No comments: