Synchronize multiple streams

It finally worked as intended! I had hard times to produce a reliable Flow Stream from my Ecto query, but then the Flow stuff is just pure delight :blush:

Here is the join code:

  defp sum_metrics_flows(flow1, flow2) do
    Flow.bounded_join(
      :full_outer,
      flow1,
      flow2,
      &elem(&1, 0),
      &elem(&1, 0),
      fn
        {date, %{contacts: c1, views: v1}}, {date, %{contacts: c2, views: v2}} ->
          {date, %{contacts: c1 + c2, views: v1 + v2}}

        date_and_metrics, nil -> date_and_metrics
        nil, date_and_metrics -> date_and_metrics
      end
    )
  end

See this thread for the producer: Code review for an Ecto GenStage Producer

4 Likes