Stream: return the element I'm halting on

I have a (possibly infinite) stream of events coming asynchronously, so that they do have a delay. I want to stop the stream immediately when l I find a value that’s divisible by seven, and return it.

I tried with Stream.transform/5, because I need to return the very value I’m halting the stream on. So I would expect to :halt on the correct value (in this case 42) and that the “last” function would be called with value 42, so I could emit it. But I see the “after” function being called - at which time it’s too late to emit a value - but not the “last” function.

         Stream.cycle([1, 2, 5, 6, 23, 42, 19])
         |> Stream.transform(
           fn -> [] end,
           fn v, a ->
             if rem(v, 7) == 0 do
               {:halt, v}
             else
               {[v], a}
             end
           end,
           fn a -> IO.puts("last #{a}") end,
           fn a -> IO.puts("after #{a}") end
         )
         |> Enum.take(10)

Anybody has any suggestions? How would you do that?

Sounds like you want Stream.take_while tbh, but it’s not quite clear what exactly you intend to do with that special last value.

This seems to be describing Enum.find/2, would the following work for your use case?

Stream.cycle([1, 2, 5, 6, 23, 42, 19])
|> Enum.find(& rem(&1, 7) == 0)
1 Like

In take_while, you can tell the stream to close, but the element you are processing when you close the stream is not included in the ones returned. I basically need to receive that special element and close the stream.

Yes, that’s exactly it - but wouldn’t it materialize the whole stream before processing?

This?

[1, 2, 3, 4, 5]
|> Stream.transform(false, fn
  x, false -> {[x], x == 3}
  _, true -> {:halt, true}
end)
|> Enum.into([])
# [1, 2, 3]

No, it won’t. Both Enum and Stream iterate their inputs reduction by reduction/enumeration by enumeration. The difference is their output. Stream apis return lazy enumerables. Enum api eagerly calculate their results.

1 Like

This will close the stream at the first item after the one I’m looking for is received - and that won’t do as they come from a network, so it makes no sense to wait for the next event just to return the previous one.

I thnk Enum.find() will do.

It only reads the elements it needs:

  • find is lazy and will halt as soon as it finds something
  • it doesn’t need to build a new list

You can confirm it as follows (plus the fact it doesn’t consume infinite time and memory :wink:):

Stream.cycle([1, 2, 5, 6, 23, 42, 19]) 
|> Stream.map(&IO.inspect/1)
|> Enum.find(& rem(&1, 7) == 0)

The only issue is a potential infinite loop, if the condition is never met.

Enum and Stream both work with any enumerable inputs including streams, but you can think of most functions as roughly part of one of its categories:

  • “producer” Enum functions like map/2 / filter/2 that are returning a new list => the Stream equivalent would avoid building the list and return a new stream instead
  • “consumer” Enum functions like reduce/3, sum/1, max/1, that are returning an accumulator and not a list => can be used to consume non-infinite streams, they have no equivalent in the Stream module.
  • “lazy consumer” Enum functions like reduce_while/2, find/2, any?/2, that halt early to return a value => can be used to consume potential infinite streams, they have no equivalent in the Stream module.
3 Likes