Last week I was busy rebuilding a part of our application using streams. The pipeline is a straight-forward map-reduce with the map step involving a lot of IO-heavy work and the reduce just being aggregations on the results (counting, diffing etc). The results of the map step, however needed to be written to some external file, too.
enum --> map --> reduce(aggregate)
\-> reduce(write to file)
So in unix this would be something like tee
. The reducer for the file-part was already expecting a stream, which is why I tried to think of a way to “clone” the main stream after the map-step. I came up with this combination of a separate process and Stream.resource
paired with a receive
block. Stream.transform/4
gives us a nice way to deal with setup and tear-down:
defmodule Streamy do
def clone(stream, work) do
stream
|> Stream.transform(
fn ->
Task.async(fn ->
Stream.resource(
fn -> nil end,
fn _ ->
receive do
{:data, data} -> {[data], nil}
:halt -> {:halt, nil}
end
end,
fn _ -> nil end
)
|> work.()
end)
end,
fn data, task ->
send(task.pid, {:data, data})
{[data], task}
end,
fn task ->
send(task.pid, :halt)
end
)
end
end
do_stuff = fn stream ->
stream
|> Stream.into(File.stream!("some_file.txt"))
|> Stream.run()
end
1..10
|> Stream.map(fn i ->
Process.sleep(100)
i * 2
end)
|> Streamy.clone(do_stuff)
|> Enum.reduce(0, &Kernel.+/2)
|> IO.inspect()
This works but has some drawbacks
- The second, cloned stream is not “pulling” any data, it’s waiting for messages from the main stream
- Thus, there’s no back pressure and if the main stream is very fast in comparison with the cloned one, its mailbox may be overrun
- No one can really look at this code and go “ahh so that’s what it does” (mostly because of the many nested anonymous functions and because I left it in this gory state)
- From a lib-standpoint
Streamy.clone/2
looks rather clean, but in our case we do need the return value of the secondary stream, so it becomes even more elaborate.
Now if you think, that’s a lot of work in order to avoid using gen_stage/flow, I guess I agree after finding out about the GenStage.BroadcastDispatcher
. But I’m still not sure how to tackle this cleanly with flow. This pipeline is not the most complex so I’d like to avoid defining the consumer/producers manually, if possible. Any thoughts?
Cheers