cblavier
Synchronize multiple streams
Hi there,
I’m doing some computation on loads of Postgres data and I’m in the situation where I have two streams of data (Repo.stream/2) coming from different tables that I would like to merge into a single output stream containing the computed values.
What would be the best approach?
Thanks for your help!
Marked As Solved
cblavier
It finally worked as intended! I had hard times to produce a reliable Flow Stream from my Ecto query, but then the Flow stuff is just pure delight ![]()
Here is the join code:
defp sum_metrics_flows(flow1, flow2) do
Flow.bounded_join(
:full_outer,
flow1,
flow2,
&elem(&1, 0),
&elem(&1, 0),
fn
{date, %{contacts: c1, views: v1}}, {date, %{contacts: c2, views: v2}} ->
{date, %{contacts: c1 + c2, views: v1 + v2}}
date_and_metrics, nil -> date_and_metrics
nil, date_and_metrics -> date_and_metrics
end
)
end
See this thread for the producer: Code review for an Ecto GenStage Producer
Also Liked
benwilson512
What does merging these streams mean then? Are they put end to end? Intermixed? If intermixed, according to what criterion?
cblavier
I’m doing some maths between both stream entries.
I think I finally found the way to go using Flow library, especially with Flow — Flow v1.2.4
It will help me to join my streams and also to improve performance by leveraging on multiple cores. I will let you know how that finally turned out!
hauleth
Compute them within DB if possible?








