Rails, ElasticSearch and Go – merry go round

It all starts with a Rails application – business as usual. In one particular situation, we need to upload some complex excel sheet and update ElasticSearch with over 10k entries. The initial code was written in Ruby and run via sidekiq jobs but was taking about 28minutes to update the index!

This was a bottle neck we decided to eliminate and since there was a lot of scope for concurrency here, we decided to use Go for this. (Rather, I took up the challenge to reduce 28 minutes processing to under a minute – yeah! I am an aggressive optimist!).

So, I wrote the code to parse the excel sheet and make a series of nested hashes. So far so good and the time to do this was in a few seconds. Now, came the critical performance part of updating ElasticSearch. I used elastigo for this.

First cut: Synchronous requests to Elastic search.

As, we can see the code snippet below, there are 3 nested hashes and call es_add that creates the index for this data source.

# ~10k iterations in 3 nested for loops - synchronous requests. 
for cmake, v := range segments { // ~50 key pairs
        for model, j := range v {  // ~20 key pairs
                for insurance, segment := range j {  // ~10 key pairs
                                data := es_data{Cmake: cmake, Model: model}
                                prepare_es_data(&data, insurance, segment, prices)
                                es_add(&data)
                        }
                }
        }
}

This took 22 minutes but the user and system time taken are only a few seconds. So, most of the time has been spent in I/O for elastic search.

$ time go run example/sample1.go 
waiting.........................................................................................
................................................................................................
................................................................................................
.................Total:  10136

real    22m9.085s
user    0m9.263s
sys 0m2.111s

Here is the ElasticSearch code i.e. the code for es_add

var (
        esConn     *elastigo.Conn
        esIndex    string
)

func setup() {
        esIndex = "index-name"
        esConn = elastigo.NewConn()
        esConn.Domain = "<elastic search IP>"
        esConn.CreateIndex(esIndex)
}

func es_add_depreciation(data *es_data) {
        err := esConn.Index(esIndex, "type", "", nil, data)
        if err != nil {
                fmt.Printf("N")
        } else {
                fmt.Printf(".")
        }
}

Note: I print a “.” for success and a “N” for failure.

Second cut: Optimise this using go-routines. (10k go-routines)

I pushed the limit and added a go-routine per elastic search request for index creation! I added a WaitGroup. Note that I have used an anonymous go-routine with parameters because I want to ensure I use the right key in my data. Since Go supports closures, the top-level variables are available inside the go-routines and would keep changing values if I did not make a local copy!

# ~10k iterations with 10k go-routines
for cmake, v := range segments { // ~50 key-pairs
        for model, j := range v {  // ~20 key-pairs
                for insurance, segment := range j {  // ~10 key-pairs
                        wg.Add(1)
                        go func(cmake string, model string) {
                                defer wg.Done()
                                        data := es_data{Cmake: cmake, Model: model}
                                        prepare_es_data(&data, insurance, segment, prices)
                                        es_add(&data)
                                }(cmake, model, insurance, segment)
                        }
                }
        }
}

It took only 5 seconds. Hooray!
Unfortunately, it also had a lot of failures. On investigation, I found that elastic search throttles requests during index creation. I played around with index settings for store.throttle but to no avail.

$ time go run example/sample1.go 
waiting...
................................................................................................
.....................................................................................NNNNNNNNNNN
NNNNNNNN.NNNNNNNNNN.NN.NNNNNNN.NNNNNNNNNNNNNNNNNNNNN.NNNNNNN.NNN.NNNNNNNNNNNNNNNNNNNNNNNNNNNNNNN
NNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNN
NNNNNNNNNNNNN..NNNTotal:  10136

real    0m5.464s
user    0m4.658s
sys 0m0.920s

Third cut: Optimise further and reduce the number of go-routines

This also means lesser concurrent requests being sent to elastic search. Basically, I shifted the go-routine 1 level higher in the nested for loop.

# ~10k iterations with ~1k go-routines
for cmake, v := range segments { // ~50 key-pairs
                for model, j := range v {  // ~20 key-pairs
                        wg.Add(1)
                        go func(cmake string, model string) {
                                defer wg.Done()
                                for insurance, segment := range j {  // ~10 key-pairs
                                        data := es_data{Cmake: cmake, Model: model}
                                        prepare_es_data(&data, insurance, segment, prices)
                                        es_add(&data)
                                }
                        }(cmake, model)
                }
        }
}

Result improved a little but took more time. But failures 😦 We need to optimise more!

$ time go run example/sample1.go 
waiting...
................................................................................................N.NNNNNNN.NNN.N.NNNNN.N.N.NNNNNN.NNNNNNNNNNNN.NNNN.NNN.N...............................N.NNN.NNN
N.N.N.N.....NN..N.N.NN.N.NN.N.N.......N.N..N..........................N....................NNNNN
..Total:  10136

real    0m10.021s
user    0m9.829s
sys 0m0.901s

Fourth cut: Even lesser go-routines. (only 50)

Finally, I took the extreme route and was only 1 step behind doing this synchronously. Here, all my requests succeed and still this was pretty fast and took about 1 minute.

# ~10k iterations with 50 go-routines
for cmake, v := range segments { // ~50 key-pairs
                wg.Add(1)
                go func(cmake string) {
                        defer wg.Done()
                        for model, j := range v { // ~20 key-pairs
                                for insurance, segment := range j { // ~10 key-pairs
                                        data := es_data{Cmake: cmake, Model: model}
                                        prepare_es_data(&data, insurance, segment, prices)
                                        es_add(&data)
                                }
                        }
                }(cmake)
        }
}

But “Houston, we have a problem” – I started noticing inconsistencies in my data. Then I realised that I have made a local copy of the key but not the value! Hence I was getting data-mismatches. I need to make a local copy of the key-value v too and this would have to be a deep copy of this map every time! Now this seemed just wrong. Think! Think!

$ time go run example/sample1.go 
waiting..........................................................................................................................................................................................................................................................................................................Total:  10136

real    1m1.085s
user    0m11.263s
sys 0m3.111s

Fifth and final cut: ElasticSearch bulk APIs.

I read up on this and found the awesome code in elastigo that has BulkAPI support. Now, I reverted back my code to spawning 10k go-routines (1 per deep nested key-pair) and used the BulkAPI call for elastic search to do the work. Basically, I was back to my first code of firing up 10k go-routines. The difference was that I used the BulkIndexer API.

# ~10k iterations with 10k go-routines
for cmake, v := range segments { // ~50 key-pairs
        for model, j := range v {  // ~20 key-pairs
                for insurance, segment := range j {  // ~10 key-pairs
                        wg.Add(1)
                        go func(cmake string, model string) {
                                defer wg.Done()
                                        data := es_data{Cmake: cmake, Model: model}
                                        prepare_es_data(&data, insurance, segment, prices)
                                        es_add(&data)
                                }(cmake, model, insurance, segment)
                        }
                }
        }
}

And here is the modified ElasticSearch code that uses BulkIndexer. Now, every time there are more than 100 documents collected, it will send one BulkAPI index request.

var (
        esConn     *elastigo.Conn
        esIndexer  *elastigo.BulkIndexer
        esIndex    string
)

func es_setup() {
        esIndex = "index-name"
        esConn = elastigo.NewConn()
        esConn.Domain = "elastic search IP"

        esConn.CreateIndex(esDepreciationIndex)

        # 150 max connections and 2 second retry on failure.
        esIndexer = esConn.NewBulkIndexerErrors(150, 2)
        esIndexer.Start()

        // manage errors in a non-blocking way
        go func() {             
                for e := range esIndexer.ErrorChannel {
                        fmt.Printf("[Error]: %v\n", e.Err)
                }                               
        }()                             
}

func es_exit() {
        esIndexer.Stop() # This flushes pending requests before stopping.
}

func es_add_depreciation(data *es_data) {
        err := esIndexer.Index(esIndex, "type", "", "", "", nil, data)
        if err != nil {
                fmt.Printf("N")
        } else {
                fmt.Printf(".")
        }
}

This reduces the number of elastic search requests, guarantees data consistency and is fast too!
As you can see in the code above, we need to Start the indexer and Stop it when we are done. We also have a ErrorChannel that we monitor in a separate go-routine for errors!

Here is the performance metric:

$ time go run example/sample1.go 
.........................................................................................................................................................................................................................................................................
....................
Total:  10136

real    1m39.431s
user    0m6.576s
sys 0m0.849s

This is as good as it gets.

Conclusion

Use the right tools for the right job! We reduced 28 minutes of processing to 1.5 minutes with a good scalable solution using Go and ElasticSearch. The code above serves as an example of BulkIndexer as I did not find one easily on the internet.

5 thoughts on “Rails, ElasticSearch and Go – merry go round

    1. Well, We were using the default settings for ES and if we send requests synchronously, it would understandably be slow.

      We also realised that throttling was issue when indexes are merged. I researched as much as I could and found that the only way to avoid large number of index requests is by using Bulk API.

  1. Could’t this entire article have been replaced with a short note explaining how you discovered the Elasticsearch bulk API? I find it hard to believe Ruby was your bottleneck at, what, ~6 records/second?

    1. Well, I am not saying Ruby was the bottle neck. It was also the code that was written such that ES was updated synchronously.. if you look at my first-cut, even sending 10k requests synchronously via Go too a long time!

      No, this post could not have been replaced by my “discovering” bulk API. It also serves as an example of how to use BulkAPI with Go – no good examples out there on the net.

      Lastly, it also serves as a research guideline we followed and made changes ONLY because we required them. And this is not taking a dig at Ruby – there are some things that Go does faster and why not take that up?

Leave a reply to Thomas Hurst Cancel reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.