Cloning a stream: One stream, two consumers

Last week I was busy rebuilding a part of our application using streams. The pipeline is a straight-forward map-reduce with the map step involving a lot of IO-heavy work and the reduce just being aggregations on the results (counting, diffing etc). The results of the map step, however needed to be written to some external file, too.

enum --> map --> reduce(aggregate)
             \-> reduce(write to file)

So in unix this would be something like tee. The reducer for the file-part was already expecting a stream, which is why I tried to think of a way to “clone” the main stream after the map-step. I came up with this combination of a separate process and Stream.resource paired with a receive block. Stream.transform/4 gives us a nice way to deal with setup and tear-down:

defmodule Streamy do
  def clone(stream, work) do
    stream
    |> Stream.transform(
      fn ->
        Task.async(fn ->
          Stream.resource(
            fn -> nil end,
            fn _ ->
              receive do
                {:data, data} -> {[data], nil}
                :halt -> {:halt, nil}
              end
            end,
            fn _ -> nil end
          )
          |> work.()
        end)
      end,
      fn data, task ->
        send(task.pid, {:data, data})
        {[data], task}
      end,
      fn task ->
        send(task.pid, :halt)
      end
    )
  end
end

do_stuff = fn stream ->
  stream
  |> Stream.into(File.stream!("some_file.txt"))
  |> Stream.run()
end

1..10
|> Stream.map(fn i ->
  Process.sleep(100)
  i * 2
end)
|> Streamy.clone(do_stuff)
|> Enum.reduce(0, &Kernel.+/2)
|> IO.inspect()

This works but has some drawbacks

  • The second, cloned stream is not “pulling” any data, it’s waiting for messages from the main stream
  • Thus, there’s no back pressure and if the main stream is very fast in comparison with the cloned one, its mailbox may be overrun
  • No one can really look at this code and go “ahh so that’s what it does” (mostly because of the many nested anonymous functions and because I left it in this gory state)
  • From a lib-standpoint Streamy.clone/2 looks rather clean, but in our case we do need the return value of the secondary stream, so it becomes even more elaborate.

Now if you think, that’s a lot of work in order to avoid using gen_stage/flow, I guess I agree after finding out about the GenStage.BroadcastDispatcher. But I’m still not sure how to tackle this cleanly with flow. This pipeline is not the most complex so I’d like to avoid defining the consumer/producers manually, if possible. Any thoughts?

Cheers

Hey @hauptbenutzer welcome! Have you seen https://hexdocs.pm/flow/Flow.html? This is built on GenStage so it’s backpressure based but provides an Enum / Stream like API.

3 Likes

Hi Ben! I’ve seen flow, in fact, I’ve mentioned it in my original post :wink:
[10 hours later]
After sifting through both flow and gen_stage docs, I figured out a way to make this work. So there is GenStage.stream which seems to be what I want as the “sink” for my flow. The problem is, it takes the PIDs of already running stages and as far as I can tell, I can’t get to these internals using the flow API. I instead had to manually define a producer/consumer with manually housekeeping subscriptions and exits as described in the Flow.into_stages docs. With that setup I can start both streams (one being in another process). In this snippet demand and stages are set to low numbers so the log output is more meaningful.

I’m not sure if this is a good idea, though.

defmodule StreamyCP do
  use GenStage

  def start_link(_opts) do
    GenStage.start_link(__MODULE__, %{})
  end

  def init(state) do
    {:producer_consumer, state}
  end

  def handle_events(events, _from, state) do
    {:noreply, events, state}
  end

  def handle_subscribe(:consumer, _, _from, state) do
    {:automatic, state}
  end

  def handle_subscribe(:producer, _options, from, state) do
    new_state = Map.update(state, :subs, [], fn subs -> [from | subs] end)
    {:automatic, new_state}
  end

  def handle_cancel(_, from, state) do
    new_state = %{state | subs: List.delete(state.subs, from)}
    case new_state do
      %{subs: []} ->
        {:stop, :normal, new_state}
      _ ->
      {:noreply, [],  new_state}
    end
  end
end

{:ok, first} = StreamyCP.start_link(nil)
{:ok, second} = StreamyCP.start_link(nil)

1..10
|> Flow.from_enumerable(max_demand: 1, min_demand: 0, stages: 1)
|> Flow.map(fn i ->
  Process.sleep(100)
  i * 2
end)
|> Flow.partition(stages: 2, dispatcher: GenStage.BroadcastDispatcher, max_demand: 1, min_demand: 0)
|> Flow.into_stages([first, second])

second_result =
  Task.async(fn ->
    GenStage.stream([{second, cancel: :transient}])
    |> Stream.each(&IO.puts("writes out #{&1}"))
    |> Stream.map(&Kernel.to_string/1)
    |> Enum.join(",")
  end)

GenStage.stream([{first, cancel: :transient}])
|> Enum.reduce(0, &Kernel.+/2)
|> IO.inspect()

IO.inspect(Task.await(second_result))