How can I do a batch process requests

Hello guys
I must create an api with a great performance and I want to create with Elixir
I have a process (slow) that I must run after some requests. I want to make this flow

In each request, save the data received in memory
After x requests, send to another api (or after x seconds)

In node I can make this:

let batchData = []
const handlerRequest = (req, res) => {
  batchData.push(req. body.data)
  if (batchData > 1000) {
    // Process to send to another api
    batchData = []
  }
  res.json({ success: true })
}

Or

let batchData = []
setInterval(() => {
  if (batchData > 1000) {
    // Process to send to another api
    batchData = []
  }
}, 10000)

const handlerRequest = (req, res) => {
  batchData.push(req. body.data)
  res.json({ success: true })
}

How can I do something like this in Elixir Phoenix?

Thanks for this

1 Like

You can do this with OTP. Here is a simple GenServer that send itself a regular tick, and check the size of it’s state when data is pushed.

defmodule Demo.Gs do
  use GenServer
  @interval 10_000
  @initial_state <<>>
  
  def start_link(arg \\ @initial_state) do
    GenServer.start_link(__MODULE__, arg, name: __MODULE__)
  end

  def push(data) do
    GenServer.cast(__MODULE__, {:push, data})
  end
  
  def init(arg) do
    Process.send_after(self(), :tick, @interval)
    {:ok, arg}
  end
  
  def handle_cast({:push, data}, state) do
    state = data <> state
    case String.length(state) > 1_000 do
      true -> 
        process(state)
        {:noreply, @initial_state}
      false ->
        {:noreply, state}
    end
  end
  
  def handle_info(:tick, state) do
    process(state)
    Process.send_after(self(), :tick, @interval)
    {:noreply, @initial_state}
  end
  
  defp process(state) do
    IO.puts state
  end
end

It is just a simple example, just printing state after 10 seconds, or when pushed data is bigger than 1_000

PS. You just need to change process() with your own implementation.

5 Likes

Really, really thanks for this

My final solution:

defmodule Speed2.BatchProcess do
  use GenServer
  @initial_state []
  @interval 10000

  def start_link do
    GenServer.start_link(__MODULE__, [])
  end

  def init(initial_data) do
    Process.send_after(self(), :tick, @interval)
    {:ok, initial_data}
  end

  def handle_info(:tick, state) do
    process(state)
    Process.send_after(self(), :tick, @interval)
    {:noreply, @initial_state}
  end

  def get_my_state(process_id) do
    GenServer.call(process_id, :get_the_state)
  end

  def push(process_id, value) do
    GenServer.call(process_id, {:push, value})
  end

  def clean(process_id) do
    GenServer.call(process_id, :clean)
  end

  def process(state) do
    IO.inspect state
  end

  def handle_call(:get_the_state, _from, my_state) do
    {:reply, my_state, my_state}
  end

  def handle_call({:push, value}, _from, my_state) do
    new_state = my_state ++ [value]
    case Enum.count(new_state) > 10 do
      true -> 
        process(new_state)
        {:reply, new_state, @initial_state}
      false ->
        {:reply, new_state, new_state}
    end
  end

  def handle_call(:clean, _from, my_state) do
    {:reply, my_state, @initial_state}
  end
end
1 Like

Nice, but as an advice I prefer to separate API from Server callbacks. That means putting handle info, init down in your code with server callbacks :slight_smile:

Also…

new_state = my_state ++ [value]

# I would use, which is not the same... 
# but i would push at the head of a list, and maybe reverse later.

new_state = [value | my_state]

# Also

case do end... could also be if do else end...