Synchronize multiple streams

Hi there,

I’m doing some computation on loads of Postgres data and I’m in the situation where I have two streams of data (Repo.stream/2) coming from different tables that I would like to merge into a single output stream containing the computed values.

What would be the best approach?

Thanks for your help!

Compute them within DB if possible?

1 Like

Do both streams yield exactly the same amount of records?

No they don’t, both of my streams return different amount of records.

But all entries have a date key, and since I need to compute day by day, my idea was to chunk the streams (using Stream.chunk_by date by date) which should give me approximately the same amout of records (one per day, but it’s possible that some days have only data in one of the streams)

That could be an idea.
But regarding the complexity of the processing, I would get an unmaintainable piece of s…QL :speak_no_evil:
If possible, I’d prefer to stick with Elixir stream APIs :slight_smile:

Depends how certain you are on this point.

If I were you, I’d devise a temporary record type in the DB that will describe things in the lane of “this piece of data has 7 out of all 10 pieces that it needs to be complete” and just process both streams independently.

What does merging these streams mean then? Are they put end to end? Intermixed? If intermixed, according to what criterion?

2 Likes

I’m doing some maths between both stream entries.

I think I finally found the way to go using Flow library, especially with https://hexdocs.pm/flow/Flow.html#bounded_join/7

It will help me to join my streams and also to improve performance by leveraging on multiple cores. I will let you know how that finally turned out!

2 Likes

It finally worked as intended! I had hard times to produce a reliable Flow Stream from my Ecto query, but then the Flow stuff is just pure delight :blush:

Here is the join code:

  defp sum_metrics_flows(flow1, flow2) do
    Flow.bounded_join(
      :full_outer,
      flow1,
      flow2,
      &elem(&1, 0),
      &elem(&1, 0),
      fn
        {date, %{contacts: c1, views: v1}}, {date, %{contacts: c2, views: v2}} ->
          {date, %{contacts: c1 + c2, views: v1 + v2}}

        date_and_metrics, nil -> date_and_metrics
        nil, date_and_metrics -> date_and_metrics
      end
    )
  end

See this thread for the producer: Code review for an Ecto GenStage Producer

4 Likes