Mix task using Flow does nothing

I created a mix task which spawns a process. It uses Flow, reads from a stream and writes into a file defined in a consumer.

At the end of the day it just spawn some processes.

If I run it through iex it works fine as long as I leave the shell running.

But if I launch it from the command line as a mix task nothing happens, how do you leave the process opened?

 use Flow

  def run([stream]) do
    specs = [{{ProdCon,[]},[]}]
    consumer = [{{Consumer,[]},[]}]

    stream
    |> Flow.from_enumerable()
    |> Flow.through_specs(specs)
    |> Flow.into_specs(consumer)
  end

The reason you’re seeing nothing when running this as a mix task, is because although you spawn a new process, it is linked to the process that started it. So, when the mix task completes, it kills off any process it started.

By contrast, when you run it via iex, it will keep that process open long enough for you to see the result of your function call.

The answer to your question is that you need to supervise that process in order to wait for the result.

I would suggest using Task.Supervisor to supervise the process you start in that function, or you can use a send/receive block.

Hope that helps.

2 Likes

Hello,

Something like this?

{:ok, supervisor} = Task.Supervisor.start_link
task = Task.Supervisor.async(supervisor, fn -> worker(stream) end)
Task.await(task)

def worker(stream) do
  stream
  |> Flow.from_enumerable()
  |> Flow.through_specs(specs)
  |> Flow.into_specs(consumer)
end

That could work. Although, if all you care about is that the worker finishes, you could just use Task.async/1 with Task.await/2

I already tried Task.asyc/1 and didnt work, also with this example my worker doesnt finish, its stopped before.

For example, I have the following IO.puts in my consumer and it nevers get called:

  def handle_events(events, _from, state) do
    IO.puts "processing consumer"
    Enum.each(events, fn(event) ->
      IO.inspect event
    end)

    # We are a consumer, so we would never emit items.
    {:noreply, [], state}
  end