I’m building a platform using GenStage that sends financial stock notifications to subscribers, but not before doing some ‘work’ before sending them (to calculate what should go in the notification). I used this post by Discord as inspiration.
I have 3 steps, A -> B -> C
The ‘catch’ is that the total # of notifications I need to send are determined by passing of time. I know the total for the day that needs to be sent, and its distributed evenly over 24 hours (or at the very least, x # of hours which I know beforehand). Say I need to send 14 million on that day / 1,440 minutes. That’s 9,722 per minute over 24 hours. There won’t be spikes in sends (unless consumers go offline or something of course).
In other words, these 3 steps need to happen together in under a minute total or notifications will back up:
A: Producer
- Fetch 9,722 oldest (last sent at) users from the database once every minute. I expect to have one Producer
, because fetching this many records only takes a few milliseconds.
B: ProducerConsumer
- Take those 9,722 users and hit 5 different financial data API’s to determine what goes in each of their notifications, then send these prepared notifications to Consumer
to send them. For this step I’ll probably need to make another help thread, because I need to hit 5 different API’s 9,722 times each based on each person’s settings in their dashboard (5 * 9,722 = 48,610 total requests). I suspect to have one ProducerConsumer
because even though this step makes a lot of requests, all the API’s support Keep-Alive
so I suspect I can just make 5 Async processes that make a ton of requests at once. It’s possible I could need two ProducerConsumers
though?
C: Consumer
- I will have 10 or more Consumers
. Their job is to take the prepared notifications and send it by hitting the Google FCM XMPP API once per notification. This XMPP API is asynchronous when sending. After some time, the Consumer
will receive a message from the Romeo library that the API has responded with either NACK
or ACK
. There can be at most 100 unacknowledged requests at once per connection, with 1000 max connections.
So here is my struggle… I am having trouble fully understanding GenStage.
-
I understand the easy examples with
handle_demand
andhandle_events
where there is 100% demand being hit at all time. It gets confusing when the producer can’t deliver enough data to the next stage and the whole process “halts”. Then alllll these extra options and functions are available like:manual
,sync_notify
,handle_call
,handle_subscribe
,handle_info
, and more. On top of that there aredemand buffers
andevent buffers
, use of:queue
, etc. -
I don’t get the whole “ignore demand” thing… i thought the whole point is that consumers as for demand?
-
Most harder examples seem to refer to only “one” event at a time like this one. In my use case, I’m pretty sure I need to be doing all of this stuff in batches though, or it’s never going to complete within 1 minute (like taking 1 record out of the DB, hitting 5 API endpoints for that record, and then sending that one record)
Here is what I had so far… I’m not sure how far off I am.
I THINK my next step is going to be “buffering demand” right? That’s why it just ‘halts’ because it asks for more but I send less, and never send more after that… Similar to this example. After that, I need to feed in the users at a regular interval, but I’m not sure where to ‘inject’ it…
Aside from that, how far off am I?
Producer
defmodule Finance.PushScheduler do
use GenStage
alias Ecto.Adapters.SQL
alias GoNow.Repo
@max_buffer_size 10_000
@sql """
WITH cte AS (
SELECT id
FROM subscribers
ORDER BY last_pushed_at ASC NULLS FIRST
LIMIT $1
)
UPDATE subscribers s
SET last_pushed_at = now()
FROM cte
WHERE s.id = cte.id
RETURNING s.id, s.token, s.ip, s.user_agent;
"""
# client
def start_link(_state) do
GenStage.start_link(__MODULE__, [], name: __MODULE__)
end
# server
def init(state) do
{:producer, state, buffer_size: @max_buffer_size}
end
def handle_demand(demand, state) do
{:ok, %{rows: subscribers}} = SQL.query(Repo, @sql, [demand])
IO.puts "DEMANDED: #{demand}"
IO.puts "SUPPLIED: #{length(subscribers)}"
{:noreply, subscribers, state}
end
end
ProducerConsumer
defmodule Finance.DataAggregator do
use GenStage
@max_demand 100
# client
def start_link(_state) do
GenStage.start_link(__MODULE__, [], name: __MODULE__)
end
# server
def init(state) do
{:producer_consumer, state,
subscribe_to: [{Finance.PushScheduler, max_demand: @max_demand}]}
end
def handle_events(subscribers, _from, state) do
{:noreply, subscribers, state}
end
end
Consumer
defmodule Finance.PushSender do
use GenStage
@max_demand 100
# client
def start_link(_state) do
GenStage.start_link(__MODULE__, [])
end
# server
def init(state) do
{:consumer, state,
subscribe_to: [{Finance.DataAggregator, max_demand: @max_demand}]}
end
def handle_events(messages, _from, state) do
{:noreply, [], state}
end
end
application.ex
defmodule GoNow.Application do
use Application
def start(_type, _args) do
children = [
{Finance.PushScheduler, []},
{Finance.DataAggregator, []},
Supervisor.child_spec(Finance.PushSender, id: 1),
Supervisor.child_spec(Finance.PushSender, id: 2),
Supervisor.child_spec(Finance.PushSender, id: 3),
Supervisor.child_spec(Finance.PushSender, id: 4),
Supervisor.child_spec(Finance.PushSender, id: 5)
]
opts = [strategy: :one_for_one, name: GoNow.Supervisor]
Supervisor.start_link(children, opts)
end
end