Anko

Anko

Genstage patterns

I’m fairly new to Elixir, and so far I’m finding it very exciting. So many of my concurrency problems seem to be solved in a simple, elegant way. Now i’ve learnt the basics, I’m trying to write some applications and am having trouble working out the right patterns.

First I was using Wabbit as a Genstage producer (consuming from a rabbitmq queue). I was doing some processing on the messages and then inserting into a DB. It worked great locally, around ~8000/s. When I moved it to a production environment, after excessive logging, I worked out that it was choking up at the Sink end (backpressure was working!) because of latency of writes. Locally, i would get say, 500 events to the consumer at a time, but in production i would get 1 at a time, which would make for higher latency on the write per message.

My solution was to create a stage which buffered upto x events, or for x ms (whichever came first). To be more concrete, here is the code.

# This will handle the acks back to rabbitmq
defmodule RabbitAckker do
  use GenStage
  require Logger

  def start_link() do
    GenStage.start_link(__MODULE__, :ok)
  end

  def init(:ok) do
    Process.send_after(self(), {:flush}, 500) # first message is fairly soon after startup
    {:producer_consumer, []}
  end

  # handle events coming from rabbit
  def handle_events(events, _from, state) do
    event_buffer = state
    new_event_buffer = event_buffer ++ events
    messages_before_purge = 400

    if (Enum.count(new_event_buffer) >= messages_before_purge) do
      for {event, meta} <- new_event_buffer do
        :ok = ack(meta.channel, meta.delivery_tag)
      end

      # pass all events down to the next stage
      {:noreply, new_event_buffer, []}
    else
      # don't pass any events down, just store them in the state
      {:noreply, [], new_event_buffer}
    end
  end

  # when this flush message comes, ack the events and pass them down.
  # TODO: only flush when we haven't acked anything in flush_time ms
  def handle_info({:flush}, state) do
    flush_time = 2_000 # ms
    event_buffer = state

    event_count = Enum.count(state)
    if event_count > 0 do
      Logger.debug("ackker: timer flush called for #{event_count} events")
    end
    # ack all the events we will flush
    for {event, meta} <- event_buffer do
      :ok = ack(meta.channel, meta.delivery_tag)
    end
    
    # call this again in another flush_time ms
    Process.send_after(self(), {:flush}, flush_time)
    # move event_buffer to events, and reset the state
    {:noreply, event_buffer, []}
  end

  defp ack(channel, delivery_tag) do
    try do
      Wabbit.Basic.ack(channel, delivery_tag)
    catch
      _, _ ->
        :ok
    end
  end
end

This works great! I realise that I have potential for dataloss if the process is killed before flush_time but you have that problem with any sort of buffering.

My first question is, is there a better way to do this?

Now on to my next problem!

In addition to writing to a datastore, I would like to spit out the messages to multiple websockets. Sometimes the consumers on the end of the websocket are slow. Each of them have different performance characteristics. How can I drop messages to them if they aren’t keeping up? I am trying to avoid an unbounded buffer. I am also trying to write to the datastore at the same time, but i want the datastore to get every message and apply backpressure (via demand) as it is in the above example.

Any suggestions on a good pattern for this?

Thanks for taking the time to read through my whole question!

Most Liked

sasajuric

sasajuric

Author of Elixir In Action

I had a similar situation a long time ago, and I solved it by doing an adaptive buffering with two processes. One process enqueues incoming items, while another performs the write. The writer reports to the queue that it’s available. Then, as soon as the first item arrives, the queue process sends it to the writer, and marks that the writer is busy.

Now, while the writer is busy, the queue stores new items in its internal structure. Then, when the writer is done and reports back to the queue, the queue can send all the items to the writer at once, and the writer can store them at once. In this way, the writer is profiting from the fact that writing N items at once is way faster than N writes of a single item.

I’m not familiar with GenStage, but I feel that this should be doable using two stages (consumer-producer for the queue, and consumer for the writer).

The nice benefit of this approach is that buffering is adaptable. If writing is faster than the rate of incoming messages, then there’s no buffering. Otherwise, the buffer will expand to accommodate the incoming rate.

Another nice benefit is that in the queue process you can do all sorts of trickery to handle overload, such as remove old items when the queue becomes large, reject new items if the queue is too large, or eliminate duplicate requests. I did all of those things in that project, and the result was pretty stable and resilient against all kinds of bursts, failures, latency increases and other problems.

Again, I’m not familiar with GenStage, so not sure if there’s an out-of-the-box solution for that, or you need to work a bit for it, but I believe that it should be possible with GenStage.

peerreynders

peerreynders

Currently you seem to be on a fixed period “flush-cycle”. Process.send_after/4 returns a timer reference which can be used with Process.cancel_timer/1 to cancel the previous timer if you happen to release events in handle_event - in fact you should be able to factor out a common function between handle_events and handle_info for event release and timer renewal.

I don’t know about patterns but based on your description I’d start with this:

  • Write a simple GenServer that is dedicated to a single WebSocket. The idea being that the GenServer’s mailbox becomes your “unbounded buffer” (i.e. let the VM handle it). The GenServer itself processes one message at a time and blocks until the WebSocket is finished sending the current message.
  • Have those “Observer” WebSocket GenServers register with a “Observable” GenServer (reference to Observer Pattern but not in an OO way). The “Observable” GenServer simply sends all the events it receives to all the registered “Observers” (immediately and in turn).
  • Finally write a “Tap” GenStage. Essentially the “Tap” simply forwards all the events it receives - but not until it sends a copy to another process (in this case the “Observable” GenServer). Now it might be tempting to combine “Tap”/“Observable” but the priority is to get the events to the datastore, so by sticking to a simple “Tap” GenStage, the delay of getting the events to the datastore is minimized to the time it takes to put a copy of the events into the “Observable” mailbox. Distribution of the events happens on the “Observable’s” time and pushing messages into the WebSockets happens on the various “Observer’s” time. (The separation also creates the opportunity for the “Observable” process to be on a different CPU core than the “Tap” process.)

Where Next?

Popular in Questions Top

Harrisonl
We have an ECS cluster with 4 services, where each task joins a single cluster, via discovery ECS discovery service. Currently when I de...
New
mcarvalho
What is the difference between System.get_env and Application.get_env? For example, what are best practices to use one versus another.
New
Patoshizzle
After calling mix ecto.create I get this error: 17:00:32.162 [error] GenServer #PID&lt;0.412.0&gt; terminating ** (Postgrex.Error) FATAL...
New
vac
Hi, I’m quite new in Elixir and I’m trying to format a string to a PEM format. I have the certificate value like MIIDBTCCAe2...... and I...
New
fireproofsocks
Forgive me if this is obvious, but how does one delete a database record WITHOUT selecting it first? Ecto.Repo — Ecto v3.14.0 has exampl...
New
hariharasudhan94
lets say i have a sample like a = 20; b = 10; if (a &gt; b) do {:ok, "a"} end if (a &lt; b) do {:ok, b} end if (a == b) do {:ok, "equa...
New
New
vonH
When I run the Plug and I recompile I wind up having to use Ctrl C to quit iex and start again. Witht the help of rlwrap I can use the cu...
New
srinivasu
How to handle excepions in elixir? Suppose i have A, B, C ,D, E modules. and each module has get() function. A.get() method will call t...
New
Brian
What is the proper way to load a module from a file in to IEX? In the python world, doing something like this pretty standard: from ....
New

Other popular topics Top

skosch
To my knowledge, put_in, Map.update etc. all have the one limitation of not automatically creating intermediate keys when needed (for exa...
New
gshaw
What is the idiomatic way of matching for not nil in Elixir? E.g., First way: defp halt_if_not_signed_in(conn, signed_in_account) when...
New
dokuzbir
I want to highlight html closing tags when i click a html tag. That works in .html files but doesnt work for html.eex templates. How can...
New
New
pmjoe
I have a relationship of love and hate with Elixir. Lots of things are just absolutely right, but there are some things that are kind of ...
New
vonH
When I run the Plug and I recompile I wind up having to use Ctrl C to quit iex and start again. Witht the help of rlwrap I can use the cu...
New
aalberti333
As the title describes, I’m trying to run Enum.map() over a list of key/value pairs, where the value is a map. My data looks like this: ...
New
freewebwithme
Using vs code and installed ElixirLS: support and debugger. And I got an error popped up on start up says Failed to run ‘elixir’ comma...
New
klo
Got a question about when to concat vs. prepending items to list then reversing to achieve appending. So i know lists boil down to [1 | ...
New
openscript
Hello! Sorry for this astonishing simple question, but I’m really stuck. I try to set up the intellij-elixir plugin, but I don’t know ho...
New

We're in Beta

About us Mission Statement