Hi there,
I’m trying to create a system in which there is a way to keep track of events that were passed to a GenStage producer. From what I’ve seen, GenStage is pretty much a one-way road and there seems to be no integrated way of eventually letting some caller know their event has been processed completely.
By that I mean that I would like the producer to keep track of all the events that are currently being processed and be aware of when they are completed. It should also have a callback for awaiting the response of an event, similar to Task.await/2
.
My approach to solve this would be the following pseudocode:
defmodule Producer do
#Callback for adding events
def add_event(payload) do
GenStage.call(Producer, {:add_event, payload})
end
#Callback for waiting until an event has been completed
def await(task) do
GenStage.call({:subscribe, self(), task})
receive do
{:finished, task, reply} -> reply
end
end
#Create a new event with associated task
def handle_call({:add_event, payload}, state) do
task = Task.Supervisor.async(Producer.TaskSupervisor, fn ->
receive do
{:finished, reply} -> reply
end
end)
event = {task, payload}
{:reply, task, [event], state}
end
#Allow processes to subscribe to an event
def handle_cast({:subscribe, pid, task}, state) do
state = subscribe_to_task(state, pid, task)
{:noreply, [], state}
end
#Handle task replies and inform all subscribers to this event that it has
#now been completed
def handle_info({ref, reply}, state) do
task = task_from_ref(state, ref)
for subscriber <- get_subscribers_for_ref(state, ref) do
Process.send(subscriber, {:finished, task, reply})
end
{:noreply, [], state}
end
end
defmodule Consumer do
#When an event is completed, send {:finished, reply} message to its
#associated task
def handle_events(events, _from, state) do
for event <- events do
Process.send(elem(event, 0), {:finished, :some_reply})
end
{:noreply, [], state}
end
end
Of course, I left out some parts of the implementation above and a more elaborate version could also include not just a message when an event has been processed completely but also additional messages about its progress (e. g. by using Agents instead of Tasks).
What do you think? Am I completely overlooking something? Is this a tremendously bad idea? How do you handle cases where you want to be able to check and/or wait on the progress of an event?