Keeping track of event progress in GenStage

Hi there,

I’m trying to create a system in which there is a way to keep track of events that were passed to a GenStage producer. From what I’ve seen, GenStage is pretty much a one-way road and there seems to be no integrated way of eventually letting some caller know their event has been processed completely.

By that I mean that I would like the producer to keep track of all the events that are currently being processed and be aware of when they are completed. It should also have a callback for awaiting the response of an event, similar to Task.await/2.

My approach to solve this would be the following pseudocode:

defmodule Producer do
  #Callback for adding events
  def add_event(payload) do
    GenStage.call(Producer, {:add_event, payload})
  end

  #Callback for waiting until an event has been completed
  def await(task) do
    GenStage.call({:subscribe, self(), task})
    receive do
      {:finished, task, reply} -> reply
    end
  end

  #Create a new event with associated task
  def handle_call({:add_event, payload}, state) do
    task = Task.Supervisor.async(Producer.TaskSupervisor, fn ->
      receive do
        {:finished, reply} -> reply
      end
    end)
    event = {task, payload}
    {:reply, task, [event], state}
  end

  #Allow processes to subscribe to an event
  def handle_cast({:subscribe, pid, task}, state) do
    state = subscribe_to_task(state, pid, task)
    {:noreply, [], state}
  end

  #Handle task replies and inform all subscribers to this event that it has
  #now been completed
  def handle_info({ref, reply}, state) do
    task = task_from_ref(state, ref)
    for subscriber <- get_subscribers_for_ref(state, ref) do
      Process.send(subscriber, {:finished, task, reply})
    end
    {:noreply, [], state}
  end
end

defmodule Consumer do
  #When an event is completed, send {:finished, reply} message to its
  #associated task
  def handle_events(events, _from, state) do
    for event <- events do
      Process.send(elem(event, 0), {:finished, :some_reply})
    end
    {:noreply, [], state}
  end
end

Of course, I left out some parts of the implementation above and a more elaborate version could also include not just a message when an event has been processed completely but also additional messages about its progress (e. g. by using Agents instead of Tasks).

What do you think? Am I completely overlooking something? Is this a tremendously bad idea? How do you handle cases where you want to be able to check and/or wait on the progress of an event?

3 Likes

It is my understanding that receive within a GenServer/GenStage callback is a no-no. Because the process is processing the result of a receive to do the callback you can create a deadlock waiting for more input, or you can mess up the GenServer/GenStage protocol by getting a message meant for that protocol rather than the one you are expecting. Messages being async you have no control over what message you will receive.

If you need to track progress, the best solution is a separate server for that purpose that can receive GenServer calls from each stage reporting status on the events using some unique key in the event. Then that server will not block other activity in the stages. You do risk creating a bottleneck with a single status server catching messages from each of the stages if this is intended for high throughput I would use cast for the status messages.