Is there a way to split a stream into two outputs? similar to `tee` on linux

Like the title says, I want to pass a stream to two outputs.

I am looking for something like below:

function_which_returns_stream
|> Enum.tee( mapper_for_func1 |> postgrex_stream1, mapper_for_func2 |> postgrex_stream2)

Is there a way to achieve this in Elixir?

2 Likes

You could simply use Enum.map/2 with an anonymous function that calls your other two functions like this:


fn1 = fn (x) -> IO.puts("1: #{x}") end
fn2 = fn (x) -> IO.puts("2: #{x}") end
1..3
|> Stream.map(&(&1))
|> Enum.map(stream, fn (x) ->
     fn1.(x)
     fn2.(x)
   end)
3 Likes

If you’re streaming to external systems (postgres), you can also use GenStage and a Broadcast Dispatcher? This would give you concurrency and back-pressure.

3 Likes

Genstage seems too heavy for my use case :slight_smile: If there is no alternative, I’ll probably have to use it.

2 Likes

Thanks. This really doesn’t stream the data but calls the other 2 functions for each element. I need to stream almost 2million rows to a postgres stream.

2 Likes

Can you elaborate a bit on your actual scenario?

3 Likes

What about something like this?

defmodule Test do
  def tee(input_stream, functions) do
    Stream.map(functions, fn fun ->
      run_async_pipeline(input_stream, fun)
    end)
    |> Stream.concat
    |> Stream.run
  end

  def run_async_pipeline(stream, fun) do
    stream
    |> Task.async_stream(fun)
    |> Stream.map(fn {:ok, item} -> item end) # error handling could be added here.
    |> put_in_appropriate_postgres_stream
  end
end

tee([1,2,3,4], [&(&1*&1), &(&1+1), &(&1-1)])
3 Likes

Thanks, this is very close to what I want. However, this seems to reread the stream. I tried it with the following code:

defmodule Test do
  def tee(input_stream, functions) do
    Stream.map(functions, fn fun ->
      run_async_pipeline(input_stream, fun)
    end)
    |> Stream.concat
    |> Stream.run
  end

  def run_async_pipeline(stream, fun) do
    stream
    |> Task.async_stream(fun)
    |> Stream.map(fn {:ok, item} -> item end) # error handling could be added here.
    |> Enum.to_list
    |> IO.inspect(label: "FUN")
  end
end

stream = Stream.unfold(5, fn 0 -> nil; n -> IO.puts("#{n}."); {n, n-1} end)
Test.tee(stream, [&(&1*&1), &(&1+1), &(&1-1)])

And got this output

5.
4.
3.
2.
1.
FUN: [25, 16, 9, 4, 1]
5.
4.
3.
2.
1.
FUN: [6, 5, 4, 3, 2]
5.
4.
3.
2.
1.
FUN: [4, 3, 2, 1, 0]

My use case parses some xml files and pipes them to two postgresql table streams. I don’t want to parse the xml file 2 times to run this. Is there a way to do that?

1 Like

This is the high level code that I have:

defmodule Loader do
  import Logger, only: [debug: 1]
  import PG

  def load_all do
    files()
    |> Enum.with_index(1)
    |> Task.async_stream(&load_file/1, max_concurrency: DataLoader.jobs, timeout: :infinity)
    |> Stream.flat_map(fn {:ok, products} -> products end)
    |> Stream.uniq_by(fn [id | _] -> id  end)
    |> copy("COPY products (id, name) FROM STDIN DELIMITERS E'\\t' NULL ''")
  end

  defp load_file({filepath, index}) do
    debug "#{index} loading #{filepath}"
    Parser.parse_xml(filepath)
  end

  defp files do
    Path.wildcard("#{DataLoader.data_dir}/products_*.xml")
  end
end

The current code streams the data to a postgresql stream to a single table. However, I get info about the product as well as the product_categories in the same input file. I need to populate both the tables using these input files. I don’t want to do 2 passes for parsing.

1 Like

My 2 cents:

I would start a separate process that streamed incoming messages into the second stream.
Then have the initial stream output those messages to the second stream.

Genstage is a much fancier version of this that handles all the horrible edge cases.

2 Likes

Thanks, seems like Genstage is the easy way out for this. Would be a lot easier if there was a simpler version :slight_smile:

1 Like

You should give it a try, then figure out how you would like a more simple interface, then post that example here and/or make a library to handle it. :slight_smile:

3 Likes