It finally worked as intended! I had hard times to produce a reliable Flow Stream from my Ecto query, but then the Flow stuff is just pure delight
Here is the join code:
defp sum_metrics_flows(flow1, flow2) do
Flow.bounded_join(
:full_outer,
flow1,
flow2,
&elem(&1, 0),
&elem(&1, 0),
fn
{date, %{contacts: c1, views: v1}}, {date, %{contacts: c2, views: v2}} ->
{date, %{contacts: c1 + c2, views: v1 + v2}}
date_and_metrics, nil -> date_and_metrics
nil, date_and_metrics -> date_and_metrics
end
)
end
See this thread for the producer: Code review for an Ecto GenStage Producer