Need Help: GenStage - Recording Messages To Database & Handle Database Downtime Situations

I’m trying to build a simple script that records chat messages to the database, so if a user is offline, we can load up those messages later. I’ve been doing a bit of studying and I think GenStage is what I’m looking for. I found a few tutorials and guides here along with the docs https://hexdocs.pm/gen_stage/GenStage.html:

I think what I need a “push-based” setup, but I’m not entirely sure, still learning and need some advice. It needs to handle a lot of incoming messages, or big spikes of messages at random times.

So far, I’ve set up a Producer > Producer/Consumer > Consumer setup, although it’s not complete yet and some things are not working the way I want. Here is what I have so far:

  1. Producer: Collects chat messages that come in a %{} map format:
defmodule ExchatWeb.Producer do
  use GenStage

  def start_link(_args) do
    GenStage.start_link(__MODULE__, [], name: __MODULE__)
  end

  def init(_args) do
    {:producer, []}
  end

  def enqueue(event) do
    # function to receive a new chat message from the app in the form of a %{} map, send to handle_cast
    GenStage.cast(__MODULE__, {:enqueue, event})
  end

  def handle_cast({:enqueue, event}, state) do
    # send chat message straight to ProducerConsumer
    {:noreply, [event], state}
  end

  def handle_demand(_incoming_demand, state) do
    # no demand, do nothing here
    {:noreply, [], state}
  end
end

  1. Producer / Consumer: Receives messages that come in from Producer and adds them to state. The Consumer is supposed to request messages when it’s ready. A major thing I still need to add here (but don’t know how), is that I need to save the batch of messages that get sent to Consumer, and empty it only if Consumer was successful. Otherwise if Consumer crashes due to database being down temporarily, resend that batch to Consumer until successfully added to database.
defmodule ExchatWeb.ProducerConsumer do
  use GenStage

  def start_link(_args) do
    GenStage.start_link(__MODULE__, [], name: __MODULE__)
  end

  def init(_args) do
    {:producer_consumer, [], subscribe_to: [ExchatWeb.Producer]}
  end

  def handle_events(event, _from, state) do
    IO.inspect("received chat message from producer, accumulate it to state list")
    IO.inspect([event | state])
    {:noreply, [event | state], [event | state]}
  end

  def handle_demand(demand, state) do
    # Incomplete - need to send only the demand requested, also need a way to handle consumer failure.
    IO.inspect("consumer requests state - send state (entire state as a test)")
    {:noreply, state, state}
  end
end

  1. Consumer: Supposed to request 1 message every 5 seconds (in production, it will be 1000 messages).
defmodule ExchatWeb.Consumer do
  use GenStage

  def start_link(_args) do
    GenStage.start_link(__MODULE__, :ok, name: __MODULE__)
  end

  def init(:ok) do
    {:consumer, :ok, subscribe_to: [{ExchatWeb.ProducerConsumer, max_demand: 1, interval: 5000}]}
  end

  def handle_events(events, _from, state) do
    IO.inspect("receive events from ProducerConsumer, save to database")
    {:noreply, [], state}
  end
end

Things that I need, but don’t know how to do: The purpose of having the Producer/Consumer stage is to save messages in one big list for the Consumer to collect. When the Consumer is ready, it should ask for messages. However, from what I understand, if the database is down or there is some sort of refused connection/slowdown, the Consumer will crash and restart. So I don’t want to lose the batch of messages that just got sent to the Consumer. So I need a way for Producer/Consumer to temporarily store that batch of messages until the Consumer responds or asks for more. There needs to be some way that Producer/Consumer knows whether Consumer crashed or not, whether to send the same batch of message to the Consumer, or delete that batch and begin with the next batch. Or if the demand from the consumer is 1000 messages, delete the earliest 1000 message from state list, but only if Consumer successfully processed those messages and didn’t crash. Hopefully this makes sense.

One problem that I noticed immediately is that Consumer isn’t obeying the interval: 5000 rule. It’s just instantly receiving everything from the Producer/Consumer every time a message gets sent. In the example, I would like to see just 1 message come in every 5 seconds.

If you want error handling, batching and acknowledgement, have you tried doing this with Broadway? They come built-in.

I’ve heard of Broadway and RabbitMQ, but I’m not sure exactly what the differences are or what they would be for. At the moment, I’m still struggling to wrap my head around the basic GenStage that I would need before I could even learn Broadway/RabbitMQ.

Broadway does not require you to use RabbitMQ etc. It is built on top of GenStage and you can easily create your own producers, instead of using Rabbit, Kafka, SQS etc. For example, to add data to a pipeline in my app, I send a message to the producer and in the ack callback, I do some clean up, forward successful messages onto another pipeline and retry failed ones.

This book makes it all pretty clear.

2 Likes

Thanks for the link, it sounds like a good book, so I’ve purchased it. Years ago, I had to buy books to learn PHP and MySQL and now I live and breath the stuff. Elixir seems special enough to really take the time to study and learn the language, even though there are nowhere near as many examples available online as there were for PHP/MySQL when I was learning it.