+ Flow.from_enumerable

I have the following code

Repo.transaction(fn -> Blog |> where(title: ^title) |> |> Flow.from_enumerable() |> Flow.each(fn x -> IO.inspect x end) |> end)

When I run this, I get the error: (RuntimeError) cannot reduce stream outside of transaction

Is Ecto and Flow not supposed to be used in this manner?


1 Like

I believe the stream must be used in the same process that created the transaction.

There was some work towards GenStage integration in Ecto: but it’s been closed.

1 Like

Darn. Oh well. Guess I need to use Task.async_stream :confused:

We will support this likely on Ecto 3.1. It won’t make the cut for Ecto 3.0 but it is planned to happen at some point.


Has it made the cut yet?