Skip to content
Oliver Eilhard edited this page Jan 15, 2016 · 18 revisions

Bulk Processor

This wiki page describes unfinished features

A Bulk Processor is a service that can be started to receive bulk requests and commit them to Elasticsearch in the background. It is similar to the BulkProcessor in the Java Client API but has some conceptual differences.

Setup

The Bulk Processor is created by a service, just like any other service in Elastic.

client, err := elastic.NewClient()
if err != nil { ... }

// Setup a bulk processor
service := client.BulkProcessor().Name("MyBackgroundWorker-1")

The service is responsible for setting up the bulk processor. In the example above, we set just one property: The name of the bulk processor to be created. There are some other parameters you can use, e.g. the number of workers or tresholds that describe when a list of bulk requests will be committed (see below for a complete list). The service is basically a builder pattern.

To finally have a started bulk processor you can send bulk requests to, use the Do method of the service. With other services, the Do methods executes a request. With the Bulk Processor service, it starts a Bulk Processor. Here's a typical example:

client, err := elastic.NewClient()
if err != nil { ... }

// Setup a bulk processor
p, err := client.BulkProcessor().Name("MyBackgroundWorker-1").Workers(2).Do()
if err != nil { ... }

Notice that the Do method actually spins up some goroutines. If you want to safely clean up, you need to call Close on the bulk processor. The bulk processor implements the io.Closer interface, so you can eventually wrap it with other resources that need cleanup when your application stops. Here's an example of starting and stopping:

client, err := elastic.NewClient()
if err != nil { ... }

// Setup a bulk processor
p, err := client.BulkProcessor().Name("MyBackgroundWorker-1").Workers(2).Do()
if err != nil { ... }

// ... Do some work here

// Stop the bulk processor and do some cleanup
err := p.Close()
if err != nil { ... }

Indexing

You can add bulk requests to the bulk processor by using its Add function. Add accepts a BulkableRequest, so it's either BulkIndexRequest (for indexing), BulkUpdateRequest (for updating), or BulkDeleteRequest (for deleting).

// Say we want to index a tweet
t := Tweet{User: "telexico", Message: "Elasticsearch is great."}

// Create a bulk index request
// NOTICE: It is important to set the index and type here!
r := elastic.NewBulkIndexRequest().Index("twitter").Type("tweet").Id("12345")

// Add the request r to the processor p
p.Add(r)

Notice how we set the index and type on the request level. We need to do this with every request sent to a bulk processor; otherwise it won't be able to tell Elasticsearch how to index the document.

Tresholds

When you add a new request via Add, the request is not automatically committed to Elasticsearch. The whole idea of a bulk processor is to gather requests and finally send them to Elasticsearch in a batch.

Now, when does bulk processor send these batches? There are 3 parameters that you can control:

  1. When the batch exceeds a certain number.
  2. When the batch exceeds a certain size (in bytes).
  3. When the batch exceeds a certain timeout.

To specify the treshold for "number of requests", use the BulkActions(int) function on the bulk processor service. To specify the treshold for the "size of the requests", use the BulkSize(int) function. To specify an automatic flush, use the FlushInterval(time.Duration) function.

You can combine all of the three options:

client, err := elastic.NewClient()
if err != nil { ... }

// Setup a bulk processor
p, err := client.BulkProcessor().
    Name("MyBackgroundWorker-1").
    Workers(2).
    BulkActions(1000).              // commit if # requests >= 1000
    BulkSize(2 << 10).              // commit if size of requests >= 2 MB
    FlushInterval(30*time.Second).  // commit every 30s
    Do()
if err != nil { ... }

Manual flush

TODO

Before/After callbacks

TODO

Error handling

TODO

Stats

TODO

Example usage

TODO

Clone this wiki locally