Task.async_stream/5 with a stream instead of an Enum

I have a stream of changesets I’d like to insert into my DB.

stream |> Stream.map(& insert/1) |> Stream.run

Now I’m wondering how I can do inserts concurrently. I know there is https://hexdocs.pm/elixir/Task.html#async_stream/5 but my understanding is it requires an Enum instead of a Stream. What is the most elixir way of accomplishing this?

It does require a Enumerable.t, and as far as I remember Stream is an Enumerable.t, so it should “just work”, have you had any problems while trying it?

ahhhhh of course

just trying seems like a solid lesson to be re-learned :slight_smile:

Coming from Ruby-land I just assumed it wouldn’t since in the guides Enum and Stream are being handled differently

Can you elaborate on what you mean by this?

This might be just me reading into it weirdly but under https://elixir-lang.org/getting-started/enumerables-and-streams.html#streams it says

As an alternative to Enum, Elixir provides the Stream module which supports lazy operations:

I took it that they are distinctively different things, that for some reason Streams couldn’t fit the Enum interface. Now after looking at it more in depth, it does make more sense to me.

I think you’re confusing Enum with Enumerable. Enumerable is a protocol for emitting values one at a time (basically). The Enum module has functions that take inputs that implement the Enumerable protocol and it operates on those input eagerly. The Stream module has functions that take inputs that implement the Enumerable protocol and it operates on those input lazily. Both take Enumerables as inputs, it’s entirely about whether those inputs are manipulated in an eager or lazy way.

2 Likes

Thanks for the clarification!