Genstage patterns

I’m fairly new to Elixir, and so far I’m finding it very exciting. So many of my concurrency problems seem to be solved in a simple, elegant way. Now i’ve learnt the basics, I’m trying to write some applications and am having trouble working out the right patterns.

First I was using Wabbit as a Genstage producer (consuming from a rabbitmq queue). I was doing some processing on the messages and then inserting into a DB. It worked great locally, around ~8000/s. When I moved it to a production environment, after excessive logging, I worked out that it was choking up at the Sink end (backpressure was working!) because of latency of writes. Locally, i would get say, 500 events to the consumer at a time, but in production i would get 1 at a time, which would make for higher latency on the write per message.

My solution was to create a stage which buffered upto x events, or for x ms (whichever came first). To be more concrete, here is the code.

# This will handle the acks back to rabbitmq
defmodule RabbitAckker do
  use GenStage
  require Logger

  def start_link() do
    GenStage.start_link(__MODULE__, :ok)
  end

  def init(:ok) do
    Process.send_after(self(), {:flush}, 500) # first message is fairly soon after startup
    {:producer_consumer, []}
  end

  # handle events coming from rabbit
  def handle_events(events, _from, state) do
    event_buffer = state
    new_event_buffer = event_buffer ++ events
    messages_before_purge = 400

    if (Enum.count(new_event_buffer) >= messages_before_purge) do
      for {event, meta} <- new_event_buffer do
        :ok = ack(meta.channel, meta.delivery_tag)
      end

      # pass all events down to the next stage
      {:noreply, new_event_buffer, []}
    else
      # don't pass any events down, just store them in the state
      {:noreply, [], new_event_buffer}
    end
  end

  # when this flush message comes, ack the events and pass them down.
  # TODO: only flush when we haven't acked anything in flush_time ms
  def handle_info({:flush}, state) do
    flush_time = 2_000 # ms
    event_buffer = state

    event_count = Enum.count(state)
    if event_count > 0 do
      Logger.debug("ackker: timer flush called for #{event_count} events")
    end
    # ack all the events we will flush
    for {event, meta} <- event_buffer do
      :ok = ack(meta.channel, meta.delivery_tag)
    end
    
    # call this again in another flush_time ms
    Process.send_after(self(), {:flush}, flush_time)
    # move event_buffer to events, and reset the state
    {:noreply, event_buffer, []}
  end

  defp ack(channel, delivery_tag) do
    try do
      Wabbit.Basic.ack(channel, delivery_tag)
    catch
      _, _ ->
        :ok
    end
  end
end

This works great! I realise that I have potential for dataloss if the process is killed before flush_time but you have that problem with any sort of buffering.

My first question is, is there a better way to do this?

Now on to my next problem!

In addition to writing to a datastore, I would like to spit out the messages to multiple websockets. Sometimes the consumers on the end of the websocket are slow. Each of them have different performance characteristics. How can I drop messages to them if they aren’t keeping up? I am trying to avoid an unbounded buffer. I am also trying to write to the datastore at the same time, but i want the datastore to get every message and apply backpressure (via demand) as it is in the above example.

Any suggestions on a good pattern for this?

Thanks for taking the time to read through my whole question!

I had a similar situation a long time ago, and I solved it by doing an adaptive buffering with two processes. One process enqueues incoming items, while another performs the write. The writer reports to the queue that it’s available. Then, as soon as the first item arrives, the queue process sends it to the writer, and marks that the writer is busy.

Now, while the writer is busy, the queue stores new items in its internal structure. Then, when the writer is done and reports back to the queue, the queue can send all the items to the writer at once, and the writer can store them at once. In this way, the writer is profiting from the fact that writing N items at once is way faster than N writes of a single item.

I’m not familiar with GenStage, but I feel that this should be doable using two stages (consumer-producer for the queue, and consumer for the writer).

The nice benefit of this approach is that buffering is adaptable. If writing is faster than the rate of incoming messages, then there’s no buffering. Otherwise, the buffer will expand to accommodate the incoming rate.

Another nice benefit is that in the queue process you can do all sorts of trickery to handle overload, such as remove old items when the queue becomes large, reject new items if the queue is too large, or eliminate duplicate requests. I did all of those things in that project, and the result was pretty stable and resilient against all kinds of bursts, failures, latency increases and other problems.

Again, I’m not familiar with GenStage, so not sure if there’s an out-of-the-box solution for that, or you need to work a bit for it, but I believe that it should be possible with GenStage.

2 Likes

Currently you seem to be on a fixed period “flush-cycle”. Process.send_after/4 returns a timer reference which can be used with Process.cancel_timer/1 to cancel the previous timer if you happen to release events in handle_event - in fact you should be able to factor out a common function between handle_events and handle_info for event release and timer renewal.

I don’t know about patterns but based on your description I’d start with this:

  • Write a simple GenServer that is dedicated to a single WebSocket. The idea being that the GenServer’s mailbox becomes your “unbounded buffer” (i.e. let the VM handle it). The GenServer itself processes one message at a time and blocks until the WebSocket is finished sending the current message.
  • Have those “Observer” WebSocket GenServers register with a “Observable” GenServer (reference to Observer Pattern but not in an OO way). The “Observable” GenServer simply sends all the events it receives to all the registered “Observers” (immediately and in turn).
  • Finally write a “Tap” GenStage. Essentially the “Tap” simply forwards all the events it receives - but not until it sends a copy to another process (in this case the “Observable” GenServer). Now it might be tempting to combine “Tap”/“Observable” but the priority is to get the events to the datastore, so by sticking to a simple “Tap” GenStage, the delay of getting the events to the datastore is minimized to the time it takes to put a copy of the events into the “Observable” mailbox. Distribution of the events happens on the “Observable’s” time and pushing messages into the WebSockets happens on the various “Observer’s” time. (The separation also creates the opportunity for the “Observable” process to be on a different CPU core than the “Tap” process.)
1 Like

For note, buffering messages in a process like this can generally be done in two ways when you have the receiving process request a new message:

  1. The buffer waits for any message, when it gets it then it checks if it is the request message, if so it dumps the pending messages (or just first) back to that process then waits again, else it adds it to its list and then waits again.
  2. Back-buffered, 2 styles here too:
  • The buffer listens for the specific request message, when it gets it then it listens for any message then returns it back to the requester, then it waits for the request message again.
  • The buffer listens for the specific request message, when it gets it then it flushes out its entire pending mailbox and dumps all of those to the requester, then goes back to waiting for the specific request message again.

1 entirely removes the back-buffering that the BEAM does between processes, you can fill your memory pretty easily here and die.
2 properly propagates the back-buffering ‘backwards’, allowing the BEAM to throttle naturally process messages to prevent overflows while still preventing mailbox message loss on process death.

I had style 2 as a module in my old erlang projects, it is so trivial to write (a single function loop or simple genserver) that I’ve never considered distributing it, but with how many new people are coming to elixir maybe I should release a mini-library of it?

I’m confused by this, or maybe I don’t understand what you’re aiming at.
If the buffer process is separate from the worker process (which is what I suggested), and in the buffer process you take every message immediately, then in that process you actually have the way of managing memory usage because you can immediately decide what to do with each message, for example discard it if the buffer is full. This is not something you can do with a selective receive, because non-matched messages stay in the mailbox until you receive them, which might happen much later, depending on the write latency.

This is an interesting idea. However, on first thought I don’t think I quite like it, because it relies on BEAM internals. There’s also not much space for variation here. How can you refuse new messages or delete older messages if the buffer is full? If the buffer process waits for the specific message from the writer, and the writer is busy, then the buffer is stuck. In fact with this approach it’s possible to fill up entire memory.

In contrast, if the buffer immediately receives all incoming messages, it can do all sorts of things, such as making the producers wait, or discarding messages when full. But in particular, what I like about that approach (your number 1) is that the load control is explicit and written in code, instead of relying on BEAM internals.

Leaving work so too little time to show examples (maybe someone else can) but the BEAM throttles processes that try to send a message to a process that is ‘too full’, reducing their reductions temporarily and so forth until the overload on the overall VM is reduced. It is really important to keeping the system working in the face of overloads, where a process that just eats and stores all messages can eat all the memory until the VM dies. :slight_smile:

I’m aware of that property. My point was that even with that you can still blow up memory, because messages pile up in the mailbox. Imagine a process which waits for the message :foo which never arrives. All the other messages remain in the mailbox indefinitely. Therefore, even with the scheduler backpressure (reductions penalty), constant producing of other messages (not :foo) will lead to eventual memory overflow.

In contrast, if the process takes each message immediately, it can decide whether to store it, or discard it, or discard some other previously stored piece of data. By taking each message, the process actively prevents its mailbox from growing indefinitely.

That is only if they all come from different processes could that really happen, which would not happen in a genstage pipeline. The processes that send messages repeatedly to this process would be backpressured to slow down and/or stop until they get handled. :slight_smile:

If you don’t mind dropping some messages then this is certainly fine. But if not then backpressure really should be built for.

I’m aware of reductions penalty when sending a message to the process with large mailbox, but I never heard of the sender process being completely stopped. I’m not even sure how would that work. AFAIK the sender is extra penalized if the receiver has a large mailbox. But that can only mean it’s scheduled out sooner (maybe immediately). However, if the receiver is waiting for a different message, then it’s not going to progress, and the sender might again be scheduled in and produce more messages.

In the concrete case we’re discussing, imagine the database being ridiculously slow. The writer takes a long time to store, and many messages might pile up in the buffer mailbox before the writer is again ready. If the buffer selectively awaits for the writer to report back, it can’t do a single thing about its mailbox, even if there’s only one producer.

Here’s a quick demo of this situation. In the code below, the producer never sends the message selectively received in the consumer.

consumer = spawn(fn ->
  fn -> receive do :foo -> :ok end end
  |> Stream.repeatedly()
  |> Stream.run()
end)

_producer = spawn(fn ->
  fn -> send(consumer, :bar) end
  |> Stream.repeatedly()
  |> Stream.run()
end)

A brief 1 min experiment shows continuous memory growth, and I it’s a matter of time before we burn the entire available memory. Scheduler backpressure and reduction penalty won’t save us here. This is simply a consequence of the fact that we’re not taking every message from the mailbox as soon as it arrives, thus opening the possibility of the mailbox overflow.

It’s not just about dropping messages. You can also enforce backpressure by using calls in GenServer, or making the buffer a consumer stage in the pipeline. The nice thing here is that you can make some more elaborate decisions about how long do you want to make the producer wait. If the buffer is small, you can send ack message as soon as you receive an item from the producer. Otherwise, you can make the producer wait longer, until you catch some breath (i.e. until you consume some of the items).

Hey sasajuric,

I believe my code sample is doing what you are suggesting. Once the writer has demanded 400 events OR 2 seconds have elapsed it will forward them. It’s not quite adaptive, but without thresholds i don’t get how you’d know the optimal number of messages to write at a time. There is no overload because demand is pushed back to the producer, which only creates as many items as the consumers ask for.

cheers,
Anko

In your solution, you wait for 2 seconds or 400 events, whichever comes first. That means that there might be some fixed latency penalty if messages don’t arrive frequently. There is also a question of these numbers: how did you ensure they are neither too big nor too small?

In my proposal, the difference is that you write as soon as the writer is available. Therefore, if the events arrive less frequently, there will be no penalty. If the events arrive too frequently, the buffer will expand to compensate for that.

Let’s see an example. Consider a write which takes 100ms regardless of the data size. Let’s say that events arrive every 200ms. The writer is always available when the event arrives, so you write immediately (no latency penalty).

Now let’s say that events arrive every 10ms. The first event arrives, you write it immediately. Now, for the next 100ms, the writer is busy, so you’ll buffer next 10 events. Then, when the writer reports back that it’s available, so we’ll write the buffered 10 events at once. The buffer size adapted to the incoming load.

Thanks Sasajuric,

I believe this is what genstage is meant to do. When the writer is available, it sends demand (a number) back up through the processing pipeline so the producer knows it can send up to “demand” number of events to the writer.

The problem is, it’s not working. Which is why I introduced this extra buffering mechanism and why i’m asking the question.

I guess i wasn’t very clear about my first problem in my own mind. It’s really that I need help with debugging genstage!

Thanks