Repo.stream + Flow.from_enumerable

I have the following code

Repo.transaction(fn -> Blog |> where(title: ^title) |> Repo.stream() |> Flow.from_enumerable() |> Flow.each(fn x -> IO.inspect x end) |> Flow.run() end)

When I run this, I get the error: (RuntimeError) cannot reduce stream outside of transaction

Is Ecto and Flow not supposed to be used in this manner?

Thanks!

1 Like

I believe the stream must be used in the same process that created the transaction.

There was some work towards GenStage integration in Ecto: https://github.com/elixir-ecto/ecto/pull/2028 but it’s been closed.

1 Like

Darn. Oh well. Guess I need to use Task.async_stream :confused:

We will support this likely on Ecto 3.1. It won’t make the cut for Ecto 3.0 but it is planned to happen at some point.

6 Likes

Has it made the cut yet?