Linking "use Flow" modules?

I’ve been reading documentation over and over again and trying various things, and I’m still not even sure if this is possible. A basic sanity check on this would at least be helpful :grin:

I’m looking to process a stream of logs (each line is a JSON object). I want to have a supervised flow which parses and filters them and strips away what I don’t need. Then I would like to have another set of supervised flows (I just have the one for now, but there will be more once I get this working), which consume the stripped down data and do their own thing. Here’s the stripped-down outline of what I have:

# lib/my_app/application.ex
  def start(_type, _args) do
    children = [
      # Starts a worker by calling: MyApp.Worker.start_link(arg)
      {MyApp.LogFlow, [
        {{MyApp.LogFlow.Consumer, nil}, []}
      ]},
    ]

    # See https://hexdocs.pm/elixir/Supervisor.html
    # for other strategies and supported options
    opts = [strategy: :one_for_one, name: MyApp.Supervisor]
    Supervisor.start_link(children, opts)
  end

# lib/my_app/log_flow.ex
defmodule MyApp.LogFlow do
  use Flow

  def start_link(specs) do
    File.stream!("./some_data.log", [], :line) # Test file.  Will eventually come from AWS
    |> Stream.cycle() # just for now
    |> Flow.from_enumerable()
    |> Flow.map(&parse/1)
    |> Flow.map(fn %{"payload" => payload} ->
      %{payload: Map.take(payload, ~w[path controller action user_id client params])}
    end)
    |> Flow.into_specs(specs, name: :log_flow)
  end
end

# lib/my_app/log_flow/consumer.ex
defmodule MyApp.LogFlow.Consumer do
  use Flow

  def start_link(_) do
    Flow.from_stages([:log_flow])
    |> Flow.map(fn q ->
      IO.inspect(q , label: :q)

      q
    end)
    |> Flow.start_link(name: :follow_spam)
  end
end

From tracing I see that MyApp.LogFlow.Consumer.start_link/1 gets run, but the IO.inspect inside of the Flow.map doesn’t. I also get an error like this:

** (FunctionClauseError) no function clause matching in Flow.Coordinator.handle_call/3

Any help would be really appreciated!

As a quick update:

My first fix to this was to create a GenServer which did a “Broadcast” to all potential worker flow processes. This was OK, but I realized that I was sending every since log message when the workers wouldn’t need everything. So after thinking for a little while, I was reading the advice in the Flow documentation to prefer multiple sources to one source, I realized that I should have each worker start up it’s own start to the flow which filters the log messages that it specifically needs. That way it only gets the log messages that are needed but it also should hopefully scale well in the future because the producers are separate.