Skip to content
Oliver Eilhard edited this page Jan 17, 2016 · 18 revisions

Bulk Processor

This wiki page describes unfinished features (currently available in bulk-processor branch).

A Bulk Processor is a service that can be started to receive bulk requests and commit them to Elasticsearch in the background. It is similar to the BulkProcessor in the Java Client API but has some conceptual differences.

Setup

The Bulk Processor is created by a service, just like any other service in Elastic.

client, err := elastic.NewClient()
if err != nil { ... }

// Setup a bulk processor
service := client.BulkProcessor().Name("MyBackgroundWorker-1")

The service is responsible for setting up and starting the bulk processor. In the example above, we set just one property: The name of the bulk processor to be created. There are some other parameters you can use, e.g. the number of workers or tresholds that describe when a list of bulk requests will be committed (see below for a complete list). The service is basically a builder pattern.

To finally have a started bulk processor you can send bulk requests to, use the Do method of the service. With other services, the Do methods executes a request. With the Bulk Processor service, it starts a Bulk Processor. Here's a typical example:

client, err := elastic.NewClient()
if err != nil { ... }

// Setup a bulk processor
p, err := client.BulkProcessor().Name("MyBackgroundWorker-1").Workers(2).Do()
if err != nil { ... }

Notice that the Do method actually spins up some goroutines. If you want to safely clean up, you need to call Close on the bulk processor. The bulk processor implements the io.Closer interface, so you can eventually wrap it with other resources that need cleanup when your application stops. Here's an example of starting and stopping:

client, err := elastic.NewClient()
if err != nil { ... }

// Setup a bulk processor
p, err := client.BulkProcessor().Name("MyBackgroundWorker-1").Workers(2).Do()
if err != nil { ... }

// ... Do some work here

// Stop the bulk processor and do some cleanup
err := p.Close()
if err != nil { ... }

Indexing

You can add bulk requests to the bulk processor by using its Add function. Add accepts a BulkableRequest, so it's either BulkIndexRequest (for indexing), BulkUpdateRequest (for updating), or BulkDeleteRequest (for deleting).

// Say we want to index a tweet
t := Tweet{User: "telexico", Message: "Elasticsearch is great."}

// Create a bulk index request
// NOTICE: It is important to set the index and type here!
r := elastic.NewBulkIndexRequest().Index("twitter").Type("tweet").Id("12345")

// Add the request r to the processor p
p.Add(r)

Notice how we set the index and type on the request level. We need to do this with every request sent to a bulk processor; otherwise it won't be able to tell Elasticsearch how to index the document.

Tresholds

When you add a new request via Add, the request is not automatically committed to Elasticsearch. The whole idea of a bulk processor is to gather requests and finally send them to Elasticsearch in a batch.

Now, when does bulk processor send these batches? There are 3 parameters that you can control:

  1. When the batch exceeds a certain number.
  2. When the batch exceeds a certain size (in bytes).
  3. When the batch exceeds a certain timeout.

To specify the treshold for "number of requests", use the BulkActions(int) function on the bulk processor service. To specify the treshold for the "size of the requests", use the BulkSize(int) function. To specify an automatic flush, use the FlushInterval(time.Duration) function.

You can combine all of the three options:

client, err := elastic.NewClient()
if err != nil { ... }

// Setup a bulk processor
p, err := client.BulkProcessor().
    Name("MyBackgroundWorker-1").
    Workers(2).
    BulkActions(1000).              // commit if # requests >= 1000
    BulkSize(2 << 10).              // commit if size of requests >= 2 MB
    FlushInterval(30*time.Second).  // commit every 30s
    Do()
if err != nil { ... }

Manual flush

Automatically committing bulk requests based on a policy is all fine, especially for long-running background tasks. However, sometimes you need to write e.g. a migration process that needs to commit all requests before the program exits. While the Close call on the bulk processor ensures that, there's a second method of ensuring all data is sent to Elasticsearch: Flush.

Here's an example that manually asks all workers on the bulk processor to flush its data to Elasticsearch:

client, err := elastic.NewClient()
if err != nil { ... }

// Setup a bulk processor
p, err := client.BulkProcessor().
    Name("MyBackgroundWorker-1").
    Workers(2).
    Do()
if err != nil { ... }

// ... Do some work here ...

// Ask workers to commit all requests
err = p.Flush()
if err != nil { ... }

Notice that Flush is synchronous: It waits until all workers acknowledged that requests have been written.

Before/After callbacks

To get more control about how bulk processor sends requests to Elasticsearch and processes its responses, you can use the Before and After callbacks.

The Before callback is a function that gets a sequential execution identifier and the list of bulk requests that will be sent to Elasticsearch. As its name implies, it gets called before sending the requests to Elasticsearch.

The After callback is a function that gets a sequential execution identifier, the list of bulk requests that we've tried to send, the response from Elasticsearch, and an error (which can be nil). The After callback is called after the requests have been sent to Elasticsearch.

What is the purpose of the Before and After callbacks? Well, first you can collect some statistics on throughput or use it for logging. But more importantly, you can use the callbacks to handle accordingly when things go wrong. For example, if your Elasticsearch cluster goes down, the bulk processor will try to commit all your requests. If they cannot be processed and you do nothing, requests bulk up and--eventually--the system might crash. So a good way to find out that Elasticsearch has a problem is that you set up an After callback and watch the error parameter. If it is not nil, something's gone wrong, and you should probably throttle or even stop passing more requests to the bulk processor.

Error handling

Starting a background process like a bulk processor is straightforward as long as things go smooth. It gets much harder when errors occur.

As we saw in the last chapter, error handling and throttling can be done with the After callback. This section describes what happens when errors occur and what options you have to build resilient systems.

First of all, bulk processor retries on failure using exponential backoff. This means that when a worker fails to commit a list of requests to Elasticsearch, it will automatically retry. First the worker will choose a small retry interval of say 500ms. The retry interval will increase exponentially with every failed attempt. This is why it's called exponential backoff.

If bulk processor fails many many times, it will eventually give up. This is when the After callback gets triggered. If the After callback passes a non-nil error parameter, you should be warned that there's something going wrong. However, bulk processor has no way of knowing what's the correct way to handle the problem. It needs the caller to decide an appropriate solution. E.g. one application is indexing log files and could, without further ado, stop processing and simply restart a new bulk processor later. Another application has critical data and might decide to stop passing requests to bulk processor until the Elasticsearch cluster is up again. There is no promise from bulk processor other than "we try our best to put your data into Elasticsearch".

Stats

If you're the logging, tracing, and statistics person, we got you covered. If you ask bulk processor to collect statistics, you can later retrieve them.

// Create a client
client, err := elastic.NewClient()
if err != nil { ... }

// Setup a bulk processor
p, err := client.BulkProcessor().
    Name("MyBackgroundWorker-1").
    Stats(true).                   // enable collecting stats
    Do()
if err != nil { ... }

// ... Do some work here ...

// Get a snapshot of stats (always blank if not enabled--see above)
stats = p.Stats()

fmt.Printf("Number of times flush has been invoked: %d\n", stats.Flushed)
fmt.Printf("Number of times workers committed reqs: %d\n", stats.Committed)
fmt.Printf("Number of requests indexed            : %d\n", stats.Indexed)
fmt.Printf("Number of requests reported as created: %d\n", stats.Created)
fmt.Printf("Number of requests reported as updated: %d\n", stats.Updated)
fmt.Printf("Number of requests reported as success: %d\n", stats.Succeeded)
fmt.Printf("Number of requests reported as failed : %d\n", stats.Failed)

for i, w := range stats.Workers {
  fmt.Printf("Worker %d: Number of requests queued: %d\n", i, w.Queued)
  fmt.Printf("           Last response time       : %v\n", i, w.LastDuration)
}

Example usage

See this gist for a working example of a failure resilient process that uses BulkProcessor under the hood.

Clone this wiki locally