Hello!
If anyone has spare time, I’d be interested in any comments on some Elixir code I wrote. I’m a newbie to Elixir, but not to systems programming, etc. I’d love feedback on what’s idiomatic in Elixir, use of the OTP, naming conventions, etc.
I’m calling an API over the network that takes either (1) a single input or (2) an array of inputs.
For the sake of demonstration let’s say the API is called using a function api/1
and it just turns integers into strings. So (1) looks like this:
"1" = api(1)
"2" = api(2)
"3" = api(3)
… and (2) looks like this:
["1", "2", "3"] = api([1, 2, 3])
I can and have implemented batching manually before, but I thought I’d try to use Elixir to automatically batch my API calls. The annoying thing about batching is that it always requires significantly restructuring my code, e.g. a pipeline with Enum.map(&api/1)
now becomes a lot more complex.
I wrote this GenServer:
defmodule BatchingServer do
use GenServer
# Should return `{outputs, leftovers}`, where `outputs` is `%{from => output}` for each processed item,
# and `leftovers` is a list of the leftover items to be processed in the *next* batch.
@type batch_processor() ::
([{GenServer.from(), any()}] ->
{%{GenServer.from() => any()}, [{GenServer.from(), any()}]})
@type data() :: %{
interval_ms: String.t(),
# Given a batch, returns true when the batch should be immediately processed.
# Otherwise the batch will be processed when no calls have been received for `interval_ms`.
should_process?: ([any()] -> boolean()),
process_batch: batch_processor(),
batch: [{GenServer.from(), any()}]
}
def init(data) do
data = Map.put(data, :batch, [])
{:ok, data}
end
def handle_info(:timeout, data) do
{replies, data} = process_batch(data)
Enum.each(replies, fn {from, output} ->
:ok = GenServer.reply(from, output)
end)
{:noreply, data}
end
def process(server, data) do
GenServer.call(server, {:process, data})
end
def handle_call({:process, input}, from, data) do
%{
interval_ms: interval_ms,
should_process?: should_batch?,
batch: batch
} = data
data = %{data | batch: [{from, input} | batch]}
if !should_batch?.(batch) do
{:noreply, data, interval_ms}
else
{replies, data} = process_batch(data)
Enum.each(replies, fn {reply_from, output} ->
if reply_from != from do
:ok = GenServer.reply(reply_from, output)
end
end)
reply = Map.get(replies, from)
{:reply, reply, data}
end
end
@spec process_batch(data()) :: {%{GenServer.from() => any()}, data()}
def process_batch(data) do
%{
process_batch: process_batch,
batch: batch
} = data
{replies, leftovers} = process_batch.(batch)
{replies, %{data | batch: leftovers}}
end
end
When using it, the following:
# Pretend api/1 takes time to run...
inputs = [1, 2, 3]
inputs
|> Enum.map(fn input -> Task.async(fn -> api(input) end) end)
|> Task.await_many()
… becomes:
{:ok, server} =
GenServer.start_link(BatchingServer, %{
interval_ms: 1000,
should_process?: fn batch ->
length(batch) > 5
end,
process_batch: fn batch ->
outputs = batch
|> Enum.map(fn {from, input} -> input end)
|> api()
|> Enum.zip(batch)
|> Enum.map(fn output, {from, _input} -> {from, output} end)
|> Enum.into(%{})
{outputs, []}
end
})
inputs
|> Enum.map(fn input -> Task.async(fn -> BatchingServer.process(server, input) end) end)
|> Task.await_many()
I know it’s a bit of code and the example is super contrived. I’d love feedback!