Inserting data into Elasticsearch using GenStage, with elasticsearch_elixir_bulk_processor

A while ago I was using Java for an event-based Elasticsearch project. Elastic provides a Java-based library called Bulk Processor, this allows for configurable bulk inserts into Elasticsearch. Unfortunately, the configuration can only be set during instance creation. This inspired me to write an Elixir based equivalent processor which allows the configuration to be changed dynamically during run time. And of course, a good chance to utilize GenStages.

I hope this comes in useful to someone dealing with event sourcing and streams of data, I think this would be a good fit as a sink of data for Broadway based consumers.

Example usage:

alias ElasticsearchElixirBulkProcessor.Items.Index
[
  %Index{index: "test_index", source: %{"field" => "value1"}},
  %Index{index: "test_index", source: %{"field" => "value2"}},
  %Index{index: "test_index", source: %{"field" => "value3"}}
]
|> ElasticsearchElixirBulkProcessor.send_requests()

Feedback and comments are most welcome!

5 Likes