GenServer + DynamicSupervisor to track streaming workers per topic?

Hey there,

I am building a system that:

  • streams data from an external HTTP API endpoint for multiple topics
  • starts streaming processes under a supervisor
  • allows runtime introspection which topics are currently streaming (optional: + which ones errored)

The external endpoint has a param for passing multiple topics (topic count per request is limited though). To stream additional topics, I need to make a new request.

Something like StreamerManager.start(topics) would launch one or more streamer processes for topics that are not streaming yet. Topics that are already streamed by some process are skipped. FWIW, I already implemented the Streamer + Consumers using GenStage but this is likely irrelevant to my question.

I am considering a GenServer that tracks %{:topic_name => pid()} in its state and calls DynamicSupervisor to launch streamers as child processes with the appropriate args. However, the PIDs will become invalid when a streamer crashes and is restarted by the supervisor (not sure if they even matter if I rely on a supervisor).

Am I off the track here or what would be the appropriate architecture here?

You sound like you may be imagining something very similar to Registry and via-tuples, will that save you some wheel reinvention? It automatically deregisters processes that terminate.

It is single host only, not distributed, but scales very well IME especially if you’re on a new enough Elixir version to activate the partitioning support.

4 Likes

Registry looks almost perfect for the job. Can’t believe I didn’t see this before.

With via, I cannot register a process with multiple keys. I could use the topic list as a key, but then it would be hard to look up a single topic’s process without iterating through all keys.

Would it make sense to do something like this? Anything that speaks against registering inside an init callback? Having slight doubts about the coupling between Streamer and TopicRegistry, maybe not much of an issue though.

defmodule Streamer do
  use GenStage

  def init(topics) do
    for topic <- topics do
      # value does not matter at the moment
      Registry.register(MyApp.TopicRegistry, topic, nil)
    end

    # ...
    {:producer, ...}
  end
end

I don’t see any concerns with this design off the top of my head, though in a standard GenServer I would possibly move the logic into a handle_continue instead of directly in init.

1 Like

:+1:

My understanding is that any work done in init will block the Supervisor, so better to offload expensive setup to handle_continue (which, unlike some earlier workarounds to that concern, is guaranteed to be the first message in the process mailbox).

offload expensive setup to handle_continue

This is not possible with GenStage :frowning:

Edit: There is an open PR to GenStage that implements :continue and handle_continue: handle continue support by maartenvanvliet · Pull Request #300 · elixir-lang/gen_stage · GitHub

Interesting. I haven’t had an opportunity to use GenStage yet. Then I guess init will have to do!

[edit: Yay for the handle_continue PR, but looks like it’s been sitting there a while. And another PR before that for even longer!]

Here is what I am using now (thanks @shanesveller for the inspiration):

  • Start Registry in supervisor tree: {Registry, keys: :unique, name: MyApp.TopicRegistry}

  • When starting a :producer / “TopicStreamer” GenStage with topics:

    • fetch all keys from Registry:
      Registry.select(MyApp.TopicRegistry, [{{:"$1", :_, :_}, [], [:"$1"]}])
      |> List.flatten()
      |> MapSet.new()
      
    • Enum.reject all topics that exist in set of keys
    • MyApp.TopicStreamer.start_link(name: {:via, TopicRegistry, topics})

This ensures that there are no TopicStreamers with overlapping topics.