I’ve been reading documentation over and over again and trying various things, and I’m still not even sure if this is possible. A basic sanity check on this would at least be helpful
I’m looking to process a stream of logs (each line is a JSON object). I want to have a supervised flow which parses and filters them and strips away what I don’t need. Then I would like to have another set of supervised flows (I just have the one for now, but there will be more once I get this working), which consume the stripped down data and do their own thing. Here’s the stripped-down outline of what I have:
# lib/my_app/application.ex
def start(_type, _args) do
children = [
# Starts a worker by calling: MyApp.Worker.start_link(arg)
{MyApp.LogFlow, [
{{MyApp.LogFlow.Consumer, nil}, []}
]},
]
# See https://hexdocs.pm/elixir/Supervisor.html
# for other strategies and supported options
opts = [strategy: :one_for_one, name: MyApp.Supervisor]
Supervisor.start_link(children, opts)
end
# lib/my_app/log_flow.ex
defmodule MyApp.LogFlow do
use Flow
def start_link(specs) do
File.stream!("./some_data.log", [], :line) # Test file. Will eventually come from AWS
|> Stream.cycle() # just for now
|> Flow.from_enumerable()
|> Flow.map(&parse/1)
|> Flow.map(fn %{"payload" => payload} ->
%{payload: Map.take(payload, ~w[path controller action user_id client params])}
end)
|> Flow.into_specs(specs, name: :log_flow)
end
end
# lib/my_app/log_flow/consumer.ex
defmodule MyApp.LogFlow.Consumer do
use Flow
def start_link(_) do
Flow.from_stages([:log_flow])
|> Flow.map(fn q ->
IO.inspect(q , label: :q)
q
end)
|> Flow.start_link(name: :follow_spam)
end
end
From tracing I see that MyApp.LogFlow.Consumer.start_link/1
gets run, but the IO.inspect
inside of the Flow.map
doesn’t. I also get an error like this:
** (FunctionClauseError) no function clause matching in Flow.Coordinator.handle_call/3
Any help would be really appreciated!