Repo.stream + Flow.from_enumerable

I have the following code

Repo.transaction(fn -> Blog |> where(title: ^title) |> Repo.stream() |> Flow.from_enumerable() |> Flow.each(fn x -> IO.inspect x end) |> Flow.run() end)

When I run this, I get the error: (RuntimeError) cannot reduce stream outside of transaction

Is Ecto and Flow not supposed to be used in this manner?

Thanks!

2 Likes

I believe the stream must be used in the same process that created the transaction.

There was some work towards GenStage integration in Ecto: https://github.com/elixir-ecto/ecto/pull/2028 but it’s been closed.

1 Like

Darn. Oh well. Guess I need to use Task.async_stream :confused:

We will support this likely on Ecto 3.1. It won’t make the cut for Ecto 3.0 but it is planned to happen at some point.

6 Likes

Has it made the cut yet?

1 Like

Hey @josevalim, do you know if this is supported in Ecto now? Thanks

It has not. Unless someone works on it (or sponsors someone do to the work), it is unlikely to be added.

3 Likes