I am stuck on using GenStage

I’m building a platform using GenStage that sends financial stock notifications to subscribers, but not before doing some ‘work’ before sending them (to calculate what should go in the notification). I used this post by Discord as inspiration.

I have 3 steps, A -> B -> C

The ‘catch’ is that the total # of notifications I need to send are determined by passing of time. I know the total for the day that needs to be sent, and its distributed evenly over 24 hours (or at the very least, x # of hours which I know beforehand). Say I need to send 14 million on that day / 1,440 minutes. That’s 9,722 per minute over 24 hours. There won’t be spikes in sends (unless consumers go offline or something of course).

In other words, these 3 steps need to happen together in under a minute total or notifications will back up:

A: Producer - Fetch 9,722 oldest (last sent at) users from the database once every minute. I expect to have one Producer, because fetching this many records only takes a few milliseconds.

B: ProducerConsumer - Take those 9,722 users and hit 5 different financial data API’s to determine what goes in each of their notifications, then send these prepared notifications to Consumer to send them. For this step I’ll probably need to make another help thread, because I need to hit 5 different API’s 9,722 times each based on each person’s settings in their dashboard (5 * 9,722 = 48,610 total requests). I suspect to have one ProducerConsumer because even though this step makes a lot of requests, all the API’s support Keep-Alive so I suspect I can just make 5 Async processes that make a ton of requests at once. It’s possible I could need two ProducerConsumers though?

C: Consumer - I will have 10 or more Consumers. Their job is to take the prepared notifications and send it by hitting the Google FCM XMPP API once per notification. This XMPP API is asynchronous when sending. After some time, the Consumer will receive a message from the Romeo library that the API has responded with either NACK or ACK. There can be at most 100 unacknowledged requests at once per connection, with 1000 max connections.


So here is my struggle… I am having trouble fully understanding GenStage.

  • I understand the easy examples with handle_demand and handle_events where there is 100% demand being hit at all time. It gets confusing when the producer can’t deliver enough data to the next stage and the whole process “halts”. Then alllll these extra options and functions are available like :manual, sync_notify, handle_call, handle_subscribe, handle_info, and more. On top of that there are demand buffers and event buffers, use of :queue, etc.

  • I don’t get the whole “ignore demand” thing… i thought the whole point is that consumers as for demand?

  • Most harder examples seem to refer to only “one” event at a time like this one. In my use case, I’m pretty sure I need to be doing all of this stuff in batches though, or it’s never going to complete within 1 minute (like taking 1 record out of the DB, hitting 5 API endpoints for that record, and then sending that one record)

Here is what I had so far… I’m not sure how far off I am.

I THINK my next step is going to be “buffering demand” right? That’s why it just ‘halts’ because it asks for more but I send less, and never send more after that… Similar to this example. After that, I need to feed in the users at a regular interval, but I’m not sure where to ‘inject’ it…

Aside from that, how far off am I?

Producer

defmodule Finance.PushScheduler do
  use GenStage

  alias Ecto.Adapters.SQL
  alias GoNow.Repo

  @max_buffer_size 10_000

  @sql """
    WITH cte AS (
      SELECT id
      FROM subscribers
      ORDER BY last_pushed_at ASC NULLS FIRST
      LIMIT $1
    )
    UPDATE subscribers s
    SET last_pushed_at = now()
    FROM cte
    WHERE s.id = cte.id
    RETURNING s.id, s.token, s.ip, s.user_agent;
  """

  # client

  def start_link(_state) do
    GenStage.start_link(__MODULE__, [], name: __MODULE__)
  end

  # server

  def init(state) do
    {:producer, state, buffer_size: @max_buffer_size}
  end

  def handle_demand(demand, state) do
    {:ok, %{rows: subscribers}} = SQL.query(Repo, @sql, [demand])

    IO.puts "DEMANDED: #{demand}"
    IO.puts "SUPPLIED: #{length(subscribers)}"

    {:noreply, subscribers, state}
  end
end

ProducerConsumer

defmodule Finance.DataAggregator do
  use GenStage

  @max_demand 100

  # client

  def start_link(_state) do
    GenStage.start_link(__MODULE__, [], name: __MODULE__)
  end

  # server

  def init(state) do
    {:producer_consumer, state,
     subscribe_to: [{Finance.PushScheduler, max_demand: @max_demand}]}
  end

  def handle_events(subscribers, _from, state) do
    {:noreply, subscribers, state}
  end
end

Consumer

defmodule Finance.PushSender do
  use GenStage

  @max_demand 100

  # client

  def start_link(_state) do
    GenStage.start_link(__MODULE__, [])
  end

  # server

  def init(state) do
    {:consumer, state,
     subscribe_to: [{Finance.DataAggregator, max_demand: @max_demand}]}
  end

  def handle_events(messages, _from, state) do
    {:noreply, [], state}
  end
end

application.ex

defmodule GoNow.Application do
  use Application

  def start(_type, _args) do
    children = [
      {Finance.PushScheduler, []},
      {Finance.DataAggregator, []},
      Supervisor.child_spec(Finance.PushSender, id: 1),
      Supervisor.child_spec(Finance.PushSender, id: 2),
      Supervisor.child_spec(Finance.PushSender, id: 3),
      Supervisor.child_spec(Finance.PushSender, id: 4),
      Supervisor.child_spec(Finance.PushSender, id: 5)
    ]

    opts = [strategy: :one_for_one, name: GoNow.Supervisor]

    Supervisor.start_link(children, opts)
  end
end
2 Likes

EDIT I think I just had a fairly large epiphany. I will post what I was confused on, for anyone else that mgiht stumble upon this. I need to make sure i am seeing this correcrtly first.

1 Like

Ok these are my revlations so far. I THINK i’m correct on all these, please correct me if I’m wrong

  • Theres actually TWO ways to dispatch events to the consumer. In BOTH cases, the events that get sent are the 2nd argument of the return value of the callback {:noreply, [events], state}
    1. handle_demand dispatches it immediately.
    2. handle_info, handle_call, handle_cast also dispatch immediately, but as a “send” to the consumer rather than “ok you asked me, this is what I have”.
  • If you use handle_demand, and you produce less events than the consumer asks for in demand, the system will ‘stop’ or halt because a consumer doesn’t ask twice for events, it sits there and waits, and now it assumes you just gave it your last event. Also, the amount you need to send must match the available demand, but thats not kept track of for you. In other words, if you use handle_demand you must also keep track of unfulfilled demand yourself`.
  • The other option is to ignore handle_demand requests by just returning []. If you do this, it means you just told the consumer you have no work, so it will stop asking you. Now its up to you to ‘push’ the events. Also it means that when you decide to push events you might be sending more than the consumer can handle, in which case the internal buffer plays a very important role, because thats where the events go if its more than max_demand

Things I still dont get/to research…

  • GenStage.ask
  • :manual mode
5 Likes

You‘re correct.

I‘d just not say there are two ways of dispatching events, but it’s only one: Each GenStage callback can dispatch events. One callback just happens to be triggered by a consumer asking for more work. handle_demand is needed for accepting demand. It does not need to send events though. This can also happen in any other callback as well.

Also correct is that a producer is responsible for keeping track of how much events where demanded. The buffer of GenStage is afaik just a convenience for handling smaller differences and I wouldn‘t rely on it to much.

2 Likes

ok, those are all very good points, especially that demand doesnt necessarily have to dispatch…

Can you explain more about what you mean on the internal buffer? I read somewhere in the docs seeing an example that coded a buffer using :queue but then it says “the built in buffer is way more flexible”

Are you saying its not good to use the buffer? I had it sent to 500_000 currently.

And speaking of buffers, if I use the max_buffer_size, does that mean old events can ‘hang around’ in there, kind of like how events will hang around on consumers until more events come in and then they all get processed? Its important in my app that stuff doesnt get stale, so i wanted to make sure if something is in the buffer it gets sent immediately by the producer.