Processing in batches from async events

Hello all!

It was the second day of struggle to implement batching processing… But with some limits. Idea is in receiving events in phoenix channel (can do this) and sending they directly to genstage producer (probably) then collecting them by some time limit (can’t do) and processing.

In other words collect infinite events from websocket in batches that limited in time. For example: collect objects to queue in 1 second and create task to process them.

I’m not sure, but probably I can use for this GenStage and Flow but can’t find any good example how to use them for this task.

I will be happy to get any help or examples how to implement this.

Hi @michaelb, Using Genstage or Flow is the best choice to do this, because they have specific windows which might benefit you, but since you have not posted some kind of code hints as to where you are stuck, it’s difficult to help you out.

Flow is mainly for parallel computations utilizing multi cores available, but that can be leveraged for the above use cases also.

The above scenario can also be achieved with a genserver based on timeout conditions. But please do provide more details about your issue with some illustration.

Thank you for you answer!
Currently I’m here:

defmodule A.Collector do
  use GenStage

  def start_link do
    GenStage.start_link(__MODULE__, :ok, name: __MODULE__)
  end

  def init(:ok) do
    {:producer, []}
  end

  def handle_info(_, state), do: {:noreply, [], state}

  def add(event) do
    GenServer.cast(__MODULE__, {:add, event})
  end
  
  def handle_cast({:add, events}, state) do
    state = state ++ [event]
    {:noreply, [event], state}
  end

  def handle_demand(_, state) do
    state = []
    {:noreply, [], state}
  end
end

defmodule A.Flower do
  use Flow

  def start_link() do
    window = Flow.Window.periodic(1, :second)

    x =
      Flow.from_stage(A.Collector, window: window)
      |> Flow.map(fn x -> x end)
    Flow.start_link(x)
  end
end

But obviously it not works.

I think this may be a case of learning to walk before you start running. When working with GenStage it helps to have a solid understanding of how and why GenServer works.

Furthermore GenStage is used when you need to regulate backpressure - i.e. slow the producing end down so that the later stages in the pipeline don’t get overwhelmed by the data that needs to be processed - but it doesn’t really sound like you have that kind of a problem.

It sounds more like you just need a GenServer that collects events over some period of time and then starts a new, separate process (Task) to deal with those collected events.

This contrived example and brief explanation may be a potential starting point.

1 Like

@michaelb Are you trying to listen to a process continuously and every second, you want to get the collected output and process them, i.e., you want a listener which polls to your gen_stage ? Based, on your code right now, the Flow is started as separate process and once the demand is complete, it exits breaking the polling. As @peerreynders pointed out, you can use Some Genserver which will trigger your flow every second or your required timeout.

right!

Thank you! I will try.

Thank you all for your help!

I found good example My GenStage producer runs out of work to do, I buffer demand, but how do I know there is new work? that works for me.

1 Like