If you’re streaming to external systems (postgres), you can also use GenStage and a Broadcast Dispatcher? This would give you concurrency and back-pressure.
Thanks. This really doesn’t stream the data but calls the other 2 functions for each element. I need to stream almost 2million rows to a postgres stream.
defmodule Test do
def tee(input_stream, functions) do
Stream.map(functions, fn fun ->
run_async_pipeline(input_stream, fun)
end)
|> Stream.concat
|> Stream.run
end
def run_async_pipeline(stream, fun) do
stream
|> Task.async_stream(fun)
|> Stream.map(fn {:ok, item} -> item end) # error handling could be added here.
|> put_in_appropriate_postgres_stream
end
end
tee([1,2,3,4], [&(&1*&1), &(&1+1), &(&1-1)])
My use case parses some xml files and pipes them to two postgresql table streams. I don’t want to parse the xml file 2 times to run this. Is there a way to do that?
defmodule Loader do
import Logger, only: [debug: 1]
import PG
def load_all do
files()
|> Enum.with_index(1)
|> Task.async_stream(&load_file/1, max_concurrency: DataLoader.jobs, timeout: :infinity)
|> Stream.flat_map(fn {:ok, products} -> products end)
|> Stream.uniq_by(fn [id | _] -> id end)
|> copy("COPY products (id, name) FROM STDIN DELIMITERS E'\\t' NULL ''")
end
defp load_file({filepath, index}) do
debug "#{index} loading #{filepath}"
Parser.parse_xml(filepath)
end
defp files do
Path.wildcard("#{DataLoader.data_dir}/products_*.xml")
end
end
The current code streams the data to a postgresql stream to a single table. However, I get info about the product as well as the product_categories in the same input file. I need to populate both the tables using these input files. I don’t want to do 2 passes for parsing.
I would start a separate process that streamed incoming messages into the second stream.
Then have the initial stream output those messages to the second stream.
Genstage is a much fancier version of this that handles all the horrible edge cases.