Moving from GenEvent to GenStage

I’m writing a quiz system using Elixir. Every time a user chooses an answer to a question, an %AnswerPacket{} struct is created, containing information on the answer. This packet is then processed by two GenServers, ScoreServer and AnswerServer, both of which have an update/2 function, taking the server pid and the packet.

Currently, I’m implementing this using GenEvent. There’s a third process, PacketHandler, that has a :push_packet event which forwards the packet to ScoreServer and AnswerServer, as below:

def handle_event({:push_packet, packet}, %{scores: scores, answers: answers} = pids)  do
  ScoreServer.update(scores, packet)
  AnswerServer.update(answers, packet)
  {:ok, pids}
end

This works pretty well, but I’m interested in how it could be reimplemented using GenStage. Am I right in thinking that PacketHandler would be the producer, and ScoreServer and AnswerServer consumers? If so, how would PacketHandler work? Currently the event is triggered by a function call, but the examples I’ve seen of GenStage seem more like a stream. And how does it fit in with the concept of demand, if its dependent on the user choosing an answer?

I’m a bit confused, so any help would be appreciated xD

1 Like

Based on my understanding, PacketHandler would be a producer and ScoreServer and AnswerServer would be consumers, as you expected. When they are implemented, they are simply up and running, waiting for information to process. To get things started, when someone answered a question, you would send a message to PacketHandler letting it know there is information to process. Add something like this to the code that creates the %AnswerPacket{}.

send PacketHandler, :new_answer

and something like this to PacketHandler

handle_info(:new_answer, state) do
  ...
 {:noreply, events, state}
end

In the init function of PacketHandler, you can specify the :dispatcher option which allows you to tailor how PacketHandler communicates with ScoreServer and AnswerServer.

Watching this talk a couple of times helped me wrap my head around GenStage.

1 Like

Correct. To add to your reply, the PacketHandler likely wants to use the broadcast dispatcher, to ensure every event is broadcast to all consumers.

The GenStage documentation contains two examples of how a PacketHandler could be implemented (see the Broadcaster examples). There is another example in the source code.

2 Likes