It all starts with a Rails application – business as usual. In one particular situation, we need to upload some complex excel sheet and update ElasticSearch with over 10k entries. The initial code was written in Ruby and run via sidekiq jobs but was taking about 28minutes to update the index!
This was a bottle neck we decided to eliminate and since there was a lot of scope for concurrency here, we decided to use Go for this. (Rather, I took up the challenge to reduce 28 minutes processing to under a minute – yeah! I am an aggressive optimist!).
So, I wrote the code to parse the excel sheet and make a series of nested hashes. So far so good and the time to do this was in a few seconds. Now, came the critical performance part of updating ElasticSearch. I used elastigo for this.
First cut: Synchronous requests to Elastic search.
As, we can see the code snippet below, there are 3 nested hashes and call es_add that creates the index for this data source.
# ~10k iterations in 3 nested for loops - synchronous requests. for cmake, v := range segments { // ~50 key pairs for model, j := range v { // ~20 key pairs for insurance, segment := range j { // ~10 key pairs data := es_data{Cmake: cmake, Model: model} prepare_es_data(&data, insurance, segment, prices) es_add(&data) } } } }
This took 22 minutes but the user and system time taken are only a few seconds. So, most of the time has been spent in I/O for elastic search.
$ time go run example/sample1.go waiting......................................................................................... ................................................................................................ ................................................................................................ .................Total: 10136 real 22m9.085s user 0m9.263s sys 0m2.111s
Here is the ElasticSearch code i.e. the code for es_add
var ( esConn *elastigo.Conn esIndex string ) func setup() { esIndex = "index-name" esConn = elastigo.NewConn() esConn.Domain = "<elastic search IP>" esConn.CreateIndex(esIndex) } func es_add_depreciation(data *es_data) { err := esConn.Index(esIndex, "type", "", nil, data) if err != nil { fmt.Printf("N") } else { fmt.Printf(".") } }
Note: I print a “.” for success and a “N” for failure.
Second cut: Optimise this using go-routines. (10k go-routines)
I pushed the limit and added a go-routine per elastic search request for index creation! I added a WaitGroup. Note that I have used an anonymous go-routine with parameters because I want to ensure I use the right key in my data. Since Go supports closures, the top-level variables are available inside the go-routines and would keep changing values if I did not make a local copy!
# ~10k iterations with 10k go-routines for cmake, v := range segments { // ~50 key-pairs for model, j := range v { // ~20 key-pairs for insurance, segment := range j { // ~10 key-pairs wg.Add(1) go func(cmake string, model string) { defer wg.Done() data := es_data{Cmake: cmake, Model: model} prepare_es_data(&data, insurance, segment, prices) es_add(&data) }(cmake, model, insurance, segment) } } } }
It took only 5 seconds. Hooray!
Unfortunately, it also had a lot of failures. On investigation, I found that elastic search throttles requests during index creation. I played around with index settings for store.throttle
but to no avail.
$ time go run example/sample1.go waiting... ................................................................................................ .....................................................................................NNNNNNNNNNN NNNNNNNN.NNNNNNNNNN.NN.NNNNNNN.NNNNNNNNNNNNNNNNNNNNN.NNNNNNN.NNN.NNNNNNNNNNNNNNNNNNNNNNNNNNNNNNN NNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNN NNNNNNNNNNNNN..NNNTotal: 10136 real 0m5.464s user 0m4.658s sys 0m0.920s
Third cut: Optimise further and reduce the number of go-routines
This also means lesser concurrent requests being sent to elastic search. Basically, I shifted the go-routine 1 level higher in the nested for loop.
# ~10k iterations with ~1k go-routines for cmake, v := range segments { // ~50 key-pairs for model, j := range v { // ~20 key-pairs wg.Add(1) go func(cmake string, model string) { defer wg.Done() for insurance, segment := range j { // ~10 key-pairs data := es_data{Cmake: cmake, Model: model} prepare_es_data(&data, insurance, segment, prices) es_add(&data) } }(cmake, model) } } }
Result improved a little but took more time. But failures 😦 We need to optimise more!
$ time go run example/sample1.go waiting... ................................................................................................N.NNNNNNN.NNN.N.NNNNN.N.N.NNNNNN.NNNNNNNNNNNN.NNNN.NNN.N...............................N.NNN.NNN N.N.N.N.....NN..N.N.NN.N.NN.N.N.......N.N..N..........................N....................NNNNN ..Total: 10136 real 0m10.021s user 0m9.829s sys 0m0.901s
Fourth cut: Even lesser go-routines. (only 50)
Finally, I took the extreme route and was only 1 step behind doing this synchronously. Here, all my requests succeed and still this was pretty fast and took about 1 minute.
# ~10k iterations with 50 go-routines for cmake, v := range segments { // ~50 key-pairs wg.Add(1) go func(cmake string) { defer wg.Done() for model, j := range v { // ~20 key-pairs for insurance, segment := range j { // ~10 key-pairs data := es_data{Cmake: cmake, Model: model} prepare_es_data(&data, insurance, segment, prices) es_add(&data) } } }(cmake) } }
But “Houston, we have a problem” – I started noticing inconsistencies in my data. Then I realised that I have made a local copy of the key but not the value! Hence I was getting data-mismatches. I need to make a local copy of the key-value v
too and this would have to be a deep copy of this map every time! Now this seemed just wrong. Think! Think!
$ time go run example/sample1.go waiting..........................................................................................................................................................................................................................................................................................................Total: 10136 real 1m1.085s user 0m11.263s sys 0m3.111s
Fifth and final cut: ElasticSearch bulk APIs.
I read up on this and found the awesome code in elastigo that has BulkAPI support. Now, I reverted back my code to spawning 10k go-routines (1 per deep nested key-pair) and used the BulkAPI call for elastic search to do the work. Basically, I was back to my first code of firing up 10k go-routines. The difference was that I used the BulkIndexer API.
# ~10k iterations with 10k go-routines for cmake, v := range segments { // ~50 key-pairs for model, j := range v { // ~20 key-pairs for insurance, segment := range j { // ~10 key-pairs wg.Add(1) go func(cmake string, model string) { defer wg.Done() data := es_data{Cmake: cmake, Model: model} prepare_es_data(&data, insurance, segment, prices) es_add(&data) }(cmake, model, insurance, segment) } } } }
And here is the modified ElasticSearch code that uses BulkIndexer. Now, every time there are more than 100 documents collected, it will send one BulkAPI index request.
var ( esConn *elastigo.Conn esIndexer *elastigo.BulkIndexer esIndex string ) func es_setup() { esIndex = "index-name" esConn = elastigo.NewConn() esConn.Domain = "elastic search IP" esConn.CreateIndex(esDepreciationIndex) # 150 max connections and 2 second retry on failure. esIndexer = esConn.NewBulkIndexerErrors(150, 2) esIndexer.Start() // manage errors in a non-blocking way go func() { for e := range esIndexer.ErrorChannel { fmt.Printf("[Error]: %v\n", e.Err) } }() } func es_exit() { esIndexer.Stop() # This flushes pending requests before stopping. } func es_add_depreciation(data *es_data) { err := esIndexer.Index(esIndex, "type", "", "", "", nil, data) if err != nil { fmt.Printf("N") } else { fmt.Printf(".") } }
This reduces the number of elastic search requests, guarantees data consistency and is fast too!
As you can see in the code above, we need to Start
the indexer and Stop
it when we are done. We also have a ErrorChannel
that we monitor in a separate go-routine for errors!
Here is the performance metric:
$ time go run example/sample1.go ......................................................................................................................................................................................................................................................................... .................... Total: 10136 real 1m39.431s user 0m6.576s sys 0m0.849s
This is as good as it gets.
Conclusion
Use the right tools for the right job! We reduced 28 minutes of processing to 1.5 minutes with a good scalable solution using Go and ElasticSearch. The code above serves as an example of BulkIndexer as I did not find one easily on the internet.
Error Channel, that sounds interesting. Now core is not concerned with writing errors to logs or even handling them. I can concentrate more on the core. Nice!!
28 minutes?! Doesn’t sound like ruby was your problem. FMI Could you elaborate on the bottlenecks?
Well, We were using the default settings for ES and if we send requests synchronously, it would understandably be slow.
We also realised that throttling was issue when indexes are merged. I researched as much as I could and found that the only way to avoid large number of index requests is by using Bulk API.
Could’t this entire article have been replaced with a short note explaining how you discovered the Elasticsearch bulk API? I find it hard to believe Ruby was your bottleneck at, what, ~6 records/second?
Well, I am not saying Ruby was the bottle neck. It was also the code that was written such that ES was updated synchronously.. if you look at my first-cut, even sending 10k requests synchronously via Go too a long time!
No, this post could not have been replaced by my “discovering” bulk API. It also serves as an example of how to use BulkAPI with Go – no good examples out there on the net.
Lastly, it also serves as a research guideline we followed and made changes ONLY because we required them. And this is not taking a dig at Ruby – there are some things that Go does faster and why not take that up?