Need Help Building Simple Database Persistence: GenStage & Broadway?

Greetings,

I’m having a hell of a time trying to build a really simple script to collect chat messages and save them to the database… Unfortunately, I’m still too new to the language and things haven’t “clicked” yet in my mind. I recently bought this book to learn about genstage and broadway: Concurrent Data Processing in Elixir: Fast, Resilient Applications with OTP, GenStage, Flow, and Broadway by Svilen Gospodinov . While I did learn quite a bit more about the technologies, the examples provided there just didn’t apply to my specific case and I can’t find any examples to study. At the end of the book, he describes something about being able to set up persistence queue, but doesn’t actually provide the example.

What I need is a something that collects individual chat messages that come in the form of pre-formatted maps %{} and adds them to queue, or a big list. Example:

%{userid: 2, message: “hello test test test”, timestamp: …}
%{userid: 15, message: “how are you”, timestamp: …}
%{userid: 2, message: “foo bar”, timestamp: …}

If I have 1000 messages a minute, I would like to set up something scalable that will take batches of 10 message every 10 seconds and save them to mysql database using “insert_all”.

If the database is down or there is a connection error, I would like the script to continue to retry inserting the messages to the database without losing them, and keep trying for hours if needed, as our database has been known to go down for that long in rare cases.

I don’t know why I need to use RabbitMQ. I would like to just save these maps (messages) into a big simple list without having to set up a whole RabbitMQ server. Right now, I’m just trying to set up something really simple, and maybe when I become more knowledgeable, I could look into RabbitMQ again. I set up the custom transformation in the code below, as the book described, so I wouldn’t have to use RabbitMQ.

Here is what I have so far:

RoomChannel.ex: The function that sends the chat message map to the queue:

defmodule ExchatWeb.RoomChannel do
  use Phoenix.Channel
  alias ExchatWeb.Presence

  import Ecto.Query
  alias Exchat.Repo

  def handle_in("new_msg", %{"body" => body}, socket) do
    if (socket.assigns.is_anon == 0) do
      IO.inspect("user is logged in - send message")
      # This function sets the chat message to the database persistence script, formatted for database insert.
      :ok = ExchatWeb.TestProducer.save_message([%{message: body, userid: String.to_integer(socket.assigns.user_id), roomid: socket.topic, timestamp: DateTime.truncate(DateTime.utc_now(), :second)}])
      # sends the chat message to the chat app
      broadcast!(socket, "new_msg", %{body: body})
      {:noreply, socket}
    else
      IO.inspect("user is anonymous - do not send message")
      {:noreply, socket}
    end
  end
end

SaveTest.ex: Broadway Batcher:

defmodule ExchatWeb.TestSave do
  use Broadway
  require Logger

  def start_link(_args) do
    options = [
      name: ExchatWeb.TestSave,
      producer: [
        module: {ExchatWeb.TestProducer, []},
        transformer: {ExchatWeb.TestSave, :transform, []}
      ],
      processors: [
        default: [max_demand: 1, concurrency: 1]
      ],
      batchers: [
        default: [batch_size: 1, concurrency: 1, batch_timeout: 10_000]
      ]
    ]

    Broadway.start_link(__MODULE__, options)
  end

  def transform(event, _options) do
    %Broadway.Message{
      data: event,
      acknowledger: {ExchatWeb.TestSave, :pages, []}
    }
  end

  def ack(:pages, _successful, _failed) do
    :ok
  end

   def handle_message(_processor, message, _context) do
    if ExchatWeb.TestProducer.online?(message.data) do
      IO.inspect("handle message from Broadway")
      IO.inspect(message.data)
      Broadway.Message.put_batch_key(message, :default)
    else
      IO.inspect("handle message - failed")
      Broadway.Message.failed(message, "offline")
    end
  end

  def handle_batch(_batcher, [message], _batch_info, _context) do
    IO.inspect(message.data)
    IO.inspect("handle batch")
    IO.inspect([message])
    [message]
  end
end

TestProducer.ex: The producer used in the SaveTest.ex broadway batcher:

defmodule ExchatWeb.TestProducer do
  use GenStage
  require Logger

  def init(initial_state) do
    Logger.info("TestProducer init")
    {:producer, initial_state}
  end

  def handle_demand(demand, state) do
    Logger.info("TestProducer received demand for #{demand} pages")
    events = []
    {:noreply, events, state}
  end

  def save_message(pages) do
    ExchatWeb.TestSave
    |> Broadway.producer_names()
    |> List.first()
    |> GenStage.cast({:pages, pages})
  end

  def handle_cast({:pages, pages}, state) do
    {:noreply, pages, state}
  end

  def online?(_url) do
    # Pretend we are checking if the
    # service is online or not.
    # Select result randomly - (result will always be true for testing purposes).
    Enum.random([true, true, true])
  end
end

What I have here is Frankenstein code… taking examples from the book with parts that I won’t actually be using. Right now I’m trying to inspect and understand the flow of the messages, observe the queue, etc. I haven’t even gotten to the actual insert_all database call, nor the “retry” logic for database downtime yet. Right now, I’m just trying to figure out how to get the Producer queue to receive the individual chat messages (the maps %{...}) from the chat room and put them in a big list, or a queue. Then have the broadway batcher grab small batches of those messages out of the queue.

The first problem right now is that the producer receives the messages and it instantly ends up on the broadway script, it doesn’t wait 10 seconds, and it doesn’t come in multiple batches. It’s like there’s no queue or something.

To me, it seems like the solution and overall script has to be ridiculously simple, right? I’d greatly appreciate it if someone could provide a fully working example of this, and hopefully something will “click” in my head.

1 Like

Here, you are immediately putting all the pages you receive out for the processors to consume. When processes ask for demand in handle_demand, you’re giving them nothing. If you want to accrue jobs you need to queue the work in the producer’s state and keep track of the demand you haven’t served.

Your max demand and batch size is 1 so processors and batchers are going to take a message as soon as it arrives. You might want to play with those values.

I think of it like so:

  • producer keeps queue of work in it’s state
  • producer keeps track of how much demand has been requested by the processors but was unable to be served (remember the processors won’t keep polling for work after they request some and none is given, you have to push it to them once it arrives)
  • things send work to the producer or the producer requests work from somewhere
  • when jobs are enqueued to the producer, if demand has accrued, push that many messages out and reduce the accrued demand in the state, add the remaining jobs to the queue

If your database is down for hours you might end up with a lot of messages getting dropped, unless you use a producer that will persist them to disk for you, e.g. RabbitMQ.

What made it all click for me was building the pipeline in that book with a Logger/IO.puts in every function and watching it go round and round.

2 Likes

I believe in your case RabbitMQ is needed as a persistent message queue, i.e. if your DB server is down for hours you’ll just use RabbitMQ as an accumulated log of records to persist which is persisted itself.

You can do away with it and just accumulate messages in a plain GenServer message queue – or use :ets – but you are risking loss of all messages if your Elixir node goes down in the meantime because those methods are just in-memory queues. RabbitMQ can be made persistent.

Apart from that I am not even sure you need Broadway to be honest. I’d first try real hard to accept messages, queue them in RabbitMQ and have a supervised worker that periodically wakes up, pulls N messages and attempts to store them in the DB. If succeeded, you can ack the messages in RabbitMQ (which deletes them). If failed, you nack them (which keeps them in RabbitMQ). Sleep for X seconds, rinse and repeat. I likely don’t know your entire code and requirements but as I am describing it I’d easily fit the above in 3-4 files.

So IMO try take the more simple and “vanilla” route first?

3 Likes

Side question: what’s the benefit of storing state in :ets as opposed to just plain genserver state? If node goes down, so do both genserver and ets together, don’t they?

If you have only ~1000 messages per minute and you want to make sure that all the messages are persisted even if the DB goes down from time to time, and if you don’t want to setup your own RabbitMQ node, then use AWS SQS - it’s trivially simple and cheap in such cases. Then you would use Broadway for pulling the messages from SQS and inserting them into the DB if it is operational.

2 Likes

Sure, you’ll lose state either way. But if you have a huge number of queued up messages it would be more performant to use :ets because GenServers aren’t supposed to have, say, 50K pending messages in their queue for an hour or so. It’ll still work but it’ll be slower to fetch them. (Although I am not sure the speed difference will be perceptible for a human, never benchmarked that.)

1 Like

I kinda suspected that what I’m try to set up could be so simple that Broadway might not even be necessary. I’m just not experienced enough to know what setup to use. I would like to learn both methods.

As for the strategy of queueing messages (either in Rabbit or Broadway Producer), and periodically pulling them, and using ack/nack, it all makes perfect sense in my head, but I’m having a really hard time translating that into Elixir coding. Can you provide some code examples to look at?

Also, from what I understand, if the database is down, I believe the process or worker also crashes. Through the code, how do you set it up so that nack can be determined in this case?

Maybe somebody else can suggest you a good book to learn the parallel basics in Erlang / Elixir. I personally would just read the main GenServer docs and start from there without reaching for RabbitMQ just yet.

But if you do insist on RabbitMQ, I suggest you start here: AMQP — amqp v3.0.1. It’s a pretty good guide and I was off to the races in 5-10 minutes.

I did read the GenServer docs and a lot of that made sense. @cmkarlsson actually provided an excellent example that was similar to what I was looking for, and really helped with learning GenServer: Questions about saving chat messages to database. Failed inserts? Spawning? More Efficient Way? - #2 by cmkarlsson . However, he explained that the GenServer stops receiving new messages while it saves current message to the database, and it didn’t provide back pressure, inserting in batches to prevent overloading database, or a way to handle database downtime (retry inserts later). The example was pretty darn close to what I needed.

After studying the GenServer docs and reading more, I discovered GenStage might be a better way to do this, and that Broadway might be the way to handle database downtime / failed inserts.

I’ll read the AMQP guide that you provided and see if I can get more insights. Also for reference, I found a great guide for installing RabbitMQ on Ubuntu 20.04 and getting it up and running: https://computingforgeeks.com/how-to-install-latest-rabbitmq-server-on-ubuntu-linux/ , no unexpected install problems.

1 Like

re: RabbitMQ, I’d advise you to just get the official Docker image and connect to that. It also allows guest authentication which you normally have to setup yourself with a few more steps (trust me, not worth the time – but it won’t take you more than 30 minutes so you decide).

After clearing up some life distractions and studying more. I think I’m getting closer to a RabbitMQ/Broadway solution. Here is what I have so far:

defmodule ExchatWeb.TestSave do
  use Broadway
  use AMQP

  @producer BroadwayRabbitMQ.Producer

  @producer_config [
    queue: "save_chats_queue",
    declare: [durable: true],
    on_failure: :reject_and_requeue,
    qos: [prefetch_count: 10]
  ]

  def start_link(_args) do
    options = [
      name: ExchatWeb.TestSave,
      producer: [
        module: {@producer, @producer_config}
      ],
      processors: [
        default: []
      ],
      batchers: [
        default: [concurrency: 1, batch_size: 10, batch_timeout: 5000]
      ]
    ]

    Broadway.start_link(__MODULE__, options)
  end

  def publish(chat_message) do
    IO.inspect("chat message to Rabbitmq")
    # Receive chat room messages and publish it to RabbitMQ queue
    {:ok, connection} = AMQP.Connection.open
    {:ok, channel} = AMQP.Channel.open(connection)
    AMQP.Queue.declare(channel, "save_chats_queue", [durable: true])
    AMQP.Basic.publish(channel, "", "save_chats_queue", chat_message)
    AMQP.Connection.close(connection)
  end

  def handle_message(_, message, _) do
    IO.inspect("handle message")
    message
  end

  def handle_batch(_, messages, _, _) do
    IO.inspect("handle batch")
    list = messages |> Enum.map(fn e -> e.data end)
    IO.inspect(list, label: "Got batch")
    Process.sleep(10000)
    # Do Repo.insert_all(list) here. Somehow figure out if there is a failure or not, somehow tell RabbitMQ to re-queue messages upon failure?
    messages
  end


end

First, I just want someone to confirm if my thinking and concepts are correct regarding basic RabbitMQ and Broadway. I’m sort of figuring things out as I go, but I’m not sure if I understand it completely:

  1. batch_timeout variable in start_link batchers simply runs the batch every 5 sec if batch size is smaller than declared batch_size - it does NOT actually slow the process to run every 5 seconds, that must be done in the handle_batch function.

  2. handle_message gets message from rabbitmq immediately and individually as they come in.

  3. handle_batch runs immediately when batch_size is met (and accumulates via handle_message), or after batch_timeout if it’s a small batch.

  4. While batch is running, handle_message does not get new messages from rabbitmq until handle_batch is finished.

  5. SQL insert_all is called in handle_batch - and can run for 5 minutes if necessary without rabbitmq/broadway piling on messages into handle_batch/handle_message - but new messages from the chat room channel will still continue to be published in rabbitmq using the publish() function above.

  6. If failure happens in handle_batch or the sql query, script will crash and restart, or if there is an error message from SQL, I can somehow send the fail flag to reject_and_requeue so rabbitmq keeps the failed messages and broadway retries those later.


The first problem is with the publish() function. This function is receiving the message from the chat room, but it’s not saving to RabbitMQ. I’ve ensured that the queue exists in RabbitMQ and have tested it by running manual commands using the command line “iex -S Mix”, but won’t work on an .ex module script. Is this where/how I should be saving the messages to RabbitMQ? Also, if handle_batch is running an SQL call, is this a type of genserver that will block publish() from saving new messages to RabbitMQ ??

The other big problem: If I am running the SQL insert_all query on a batch (or list) of messages and it fails, how do I send the “flag” to reject_and_requeue for the batch of messages?

Made a little progress today: figured out that RabbitMQ can’t save maps %{}, which is why nothing was publishing to the queue. I am using :erlang.term_to_binary/binary_to_term to encode/decode and it’s working pretty good so far.

However, a big problem still remains. How do I mark the batch as failed if the mysql insert fails? Here is what I have:

def handle_batch(_, messages, _, _) do
    IO.inspect("handle batch")
    list = messages |> Enum.map(fn e -> :erlang.binary_to_term(e.data) end)
    case Exchat.Repo.insert_all(SaveChats,list) do
      {_, nil} ->
        IO.inspect("Saved To Database - No Error Message Returned")
      {0, _} ->
        IO.inspect{"Insert Failed"}
        Broadway.Message.failed(messages, "insert_failed")
    end
    messages
  end

If the insert fails, I try to use Broadway.Message.failed(messages, "insert_failed") but that doesn’t work.

1 Like

Still need help with this. When I try running Broadway.Mesage.failed(messages, "fail"), I get this error message:

[error] ** (FunctionClauseError) no function clause matching in Broadway.Message.failed/2
    (broadway 1.0.1) lib/broadway/message.ex:139: Broadway.Message.failed([%Broadway.Message{acknowledger: {BroadwayRabbitMQ.Producer, %AMQP.Channel{conn: %AMQP.Connection{pid: #PID<0.568.0>}, custom_consumer: {AMQP.SelectiveConsumer, #PID<0.554.0>}, pid: #PID<0.577.0>}, %{client: BroadwayRabbitMQ.AmqpClient, delivery_tag: 1, on_failure: :reject_and_requeue, on_success: :ack, redelivered: true}}, batch_key: :default, batch_mode: :bulk, batcher: :default, data: <<131, 116, 0, 0, 0, 4, 100, 0, 7, 109, 101, 115, 115, 97, 103, 101, 109, 0, 0, 0, 4, 97, 115, 100, 102, 100, 0, 6, 114, 111, 111, 109, 105, 100, 109, 0, 0, 0, 10, 114, 111, 111, 109, 58, ...>>, metadata: %{amqp_channel: %AMQP.Channel{conn: %AMQP.Connection{pid: #PID<0.568.0>}, custom_consumer: {AMQP.SelectiveConsumer, #PID<0.554.0>}, pid: #PID<0.577.0>}}, status: :ok}], "fail")
    (exchat 0.1.0) lib/exchat_web/broadway/savetest.ex:45: ExchatWeb.TestSave.handle_batch/4
    (broadway 1.0.1) lib/broadway/topology/batch_processor_stage.ex:100: Broadway.Topology.BatchProcessorStage.handle_batch/4
    (broadway 1.0.1) lib/broadway/topology/batch_processor_stage.ex:56: anonymous fn/5 in Broadway.Topology.BatchProcessorStage.handle_events/3
    (telemetry 0.4.3) /home/exchat/exchat/deps/telemetry/src/telemetry.erl:272: :telemetry.span/3
    (gen_stage 1.1.2) lib/gen_stage.ex:2471: GenStage.consumer_dispatch/6
    (stdlib 3.16.1) gen_server.erl:695: :gen_server.try_dispatch/4
    (stdlib 3.16.1) gen_server.erl:771: :gen_server.handle_msg/6
    (stdlib 3.16.1) proc_lib.erl:226: :proc_lib.init_p_do_apply/3

I’m trying to mark the entire batch as “failed”. I’m not sure why this isn’t working. I even added a handle_failed function.

 def handle_failed(messages, "fail") do
    for message <- messages do
      Broadway.Message.configure_ack(message, retry: true)
    end
  end

But I get an error for that as well:

[error] ** (FunctionClauseError) no function clause matching in ExchatWeb.TestSave.handle_failed/2
    (exchat 0.1.0) lib/exchat_web/broadway/savetest.ex:55: ExchatWeb.TestSave.handle_failed([%Broadway.Message{acknowledger: {BroadwayRabbitMQ.Producer, %AMQP.Channel{conn: %AMQP.Connection{pid: #PID<0.568.0>}, custom_consumer: {AMQP.SelectiveConsumer, #PID<0.554.0>}, pid: #PID<0.577.0>}, %{client: BroadwayRabbitMQ.AmqpClient, delivery_tag: 1, on_failure: :reject_and_requeue, on_success: :ack, redelivered: true}}, batch_key: :default, batch_mode: :bulk, batcher: :default, data: <<131, 116, 0, 0, 0, 4, 100, 0, 7, 109, 101, 115, 115, 97, 103, 101, 109, 0, 0, 0, 4, 97, 115, 100, 102, 100, 0, 6, 114, 111, 111, 109, 105, 100, 109, 0, 0, 0, 10, 114, 111, 111, 109, 58, ...>>, metadata: %{amqp_channel: %AMQP.Channel{conn: %AMQP.Connection{pid: #PID<0.568.0>}, custom_consumer: {AMQP.SelectiveConsumer, #PID<0.554.0>}, pid: #PID<0.577.0>}}, status: {:error, %FunctionClauseError{args: nil, arity: 2, clauses: nil, function: :failed, kind: nil, module: Broadway.Message}, [{Broadway.Message, :failed, [[%Broadway.Message{acknowledger: {BroadwayRabbitMQ.Producer, %AMQP.Channel{conn: %AMQP.Connection{pid: #PID<0.568.0>}, custom_consumer: {AMQP.SelectiveConsumer, #PID<0.554.0>}, pid: #PID<0.577.0>}, %{client: BroadwayRabbitMQ.AmqpClient, delivery_tag: 1, on_failure: :reject_and_requeue, on_success: :ack, redelivered: true}}, batch_key: :default, batch_mode: :bulk, batcher: :default, data: <<131, 116, 0, 0, 0, 4, 100, 0, 7, 109, 101, 115, 115, 97, 103, 101, 109, 0, 0, 0, 4, 97, 115, 100, 102, 100, 0, 6, ...>>, metadata: %{amqp_channel: %AMQP.Channel{conn: %AMQP.Connection{pid: #PID<0.568.0>}, custom_consumer: {AMQP.SelectiveConsumer, #PID<0.554.0>}, pid: #PID<0.577.0>}}, status: :ok}], "fail"], [file: 'lib/broadway/message.ex', line: 139]}, {ExchatWeb.TestSave, :handle_batch, 4, [file: 'lib/exchat_web/broadway/savetest.ex', line: 45]}, {Broadway.Topology.BatchProcessorStage, :handle_batch, 4, [file: 'lib/broadway/topology/batch_processor_stage.ex', line: 100]}, {Broadway.Topology.BatchProcessorStage, :"-handle_events/3-fun-0-", 5, [file: 'lib/broadway/topology/batch_processor_stage.ex', line: 56]}, {:telemetry, :span, 3, [file: '/home/exchat/exchat/deps/telemetry/src/telemetry.erl', line: 272]}, {GenStage, :consumer_dispatch, 6, [file: 'lib/gen_stage.ex', line: 2471]}, {:gen_server, :try_dispatch, 4, [file: 'gen_server.erl', line: 695]}, {:gen_server, :handle_msg, 6, [file: 'gen_server.erl', line: 771]}, {:proc_lib, :init_p_do_apply, 3, [file: 'proc_lib.erl', line: 226]}]}}], :context_not_set)
    (broadway 1.0.1) lib/broadway/acknowledger.ex:102: Broadway.Acknowledger.handle_failed_messages/3
    (broadway 1.0.1) lib/broadway/topology/batch_processor_stage.ex:59: anonymous fn/5 in Broadway.Topology.BatchProcessorStage.handle_events/3
    (telemetry 0.4.3) /home/exchat/exchat/deps/telemetry/src/telemetry.erl:272: :telemetry.span/3
    (gen_stage 1.1.2) lib/gen_stage.ex:2471: GenStage.consumer_dispatch/6
    (stdlib 3.16.1) gen_server.erl:695: :gen_server.try_dispatch/4
    (stdlib 3.16.1) gen_server.erl:771: :gen_server.handle_msg/6
    (stdlib 3.16.1) proc_lib.erl:226: :proc_lib.init_p_do_apply/3

All I really need to do is mark the batch of messages as failed so they can be retried later. I actually don’t need to modify the failed messages or anything, so the handle_failed call might not even be necessary. Unfortunately, in both cases I don’t know how to do this in elixir. The script is almost finished except for this last part.

As per the error message, Broadway.Mesage.failed doesn’t take a list of messages, just one. Try Enum.map/for.

IIRC, you only need handle_failed for messages that raise an exception.

1 Like

Question from a different perspective:

Is something like GeenStage also used to prevent database overload?

Example: I have a simple GenServer that works as a “write buffer” or maybe a simple queue. It collects thousands of incoming records in its state, and then writes them in bulk, either every 100 records at once, or every 5 seconds. The main goal is not to do thousands of concurrent writes to postgres.

I was wondering, rather than trying to figure out how often the genserver should do the bulk write, whether maybe instead postgres should become a genstage consumer and request data as fast as it it able to process it? Is this a meaningful use case for GenStage?

Awesome, I think I actually got the full script working the way I want now! I can build on that for other features later on, pretty neat.

Now I have a question about how to publish to RabbitMQ more efficiently. Right now, I am using these functions in my room_channel.ex script:

 def handle_in("new_msg", %{"body" => body}, socket) do
      :ok = publish(%{message: body, userid: String.to_integer(socket.assigns.user_id)})
      broadcast!(socket, "new_msg", %{body: body})
      {:noreply, socket}
  end

 def publish(chat_message) do
    # Receive chat room message map and publish it to RabbitMQ queue
    {:ok, connection} = AMQP.Connection.open
    {:ok, channel} = AMQP.Channel.open(connection)
    AMQP.Basic.publish(channel, "", "save_chats_queue", :erlang.term_to_binary(chat_message))
    AMQP.Connection.close(connection)
  end

The big question is about the publish function. As you can see, I’m opening and closing a connection every single time a message comes in. What is the correct way of doing this? Should I put this publish function in my Broadway/RabbitMQ script instead? I’m assuming that script always has a persistent connection to RabbitMQ already open, but I don’t know how to use it (the name of the connection/channel variables).

And if the connection to RabbitMQ fails, does the main Broadway script automatically reconnect?

Still need help on this, can’t find any examples online for my case except for the basic tutorial example (which I’m using above), which I’m not sure works well for full production.

With the publish() function above, is the standard practice to be opening and closing connections every single time I post a message to RabbitMQ? Is RabbitMQ (and/or Elixir) going to be able to handle the constant opening/closing of connections like this if 1000s of messages come in each second?

And should I put this function in with my Broadway script, or just set up a separate module dedicated just for publishing to RabbitMQ and leave the Broadway script for receiving/processes messages from RabbitMQ?

I’ve not used Rabbit, but I don’t see why you would open and close a connection repeatedly.