I have a (possibly infinite) stream of events coming asynchronously, so that they do have a delay. I want to stop the stream immediately when l I find a value that’s divisible by seven, and return it.
I tried with Stream.transform/5, because I need to return the very value I’m halting the stream on. So I would expect to :halt on the correct value (in this case 42) and that the “last” function would be called with value 42, so I could emit it. But I see the “after” function being called - at which time it’s too late to emit a value - but not the “last” function.
Stream.cycle([1, 2, 5, 6, 23, 42, 19])
|> Stream.transform(
fn -> [] end,
fn v, a ->
if rem(v, 7) == 0 do
{:halt, v}
else
{[v], a}
end
end,
fn a -> IO.puts("last #{a}") end,
fn a -> IO.puts("after #{a}") end
)
|> Enum.take(10)
Anybody has any suggestions? How would you do that?
In take_while, you can tell the stream to close, but the element you are processing when you close the stream is not included in the ones returned. I basically need to receive that special element and close the stream.
No, it won’t. Both Enum and Stream iterate their inputs reduction by reduction/enumeration by enumeration. The difference is their output. Stream apis return lazy enumerables. Enum api eagerly calculate their results.
This will close the stream at the first item after the one I’m looking for is received - and that won’t do as they come from a network, so it makes no sense to wait for the next event just to return the previous one.
The only issue is a potential infinite loop, if the condition is never met.
Enum and Stream both work with any enumerable inputs including streams, but you can think of most functions as roughly part of one of its categories:
“producer” Enum functions like map/2 / filter/2 that are returning a new list => the Stream equivalent would avoid building the list and return a new stream instead
“consumer” Enum functions like reduce/3, sum/1, max/1, that are returning an accumulator and not a list => can be used to consume non-infinite streams, they have no equivalent in the Stream module.
“lazy consumer” Enum functions like reduce_while/2, find/2,any?/2, that halt early to return a value => can be used to consume potential infinite streams, they have no equivalent in the Stream module.