Non-greedy `Enum.take_while`?

Up until now I have used this hack to consume from a Stream or Flow, and detect the last element, without attempting to consume past that last element:

[1,2,3]
|> Stream.map( &(&1==2 && [&1,nil] || [&1] ))
|> Stream.flat_map(&(&1))
|> Stream.take_while(&(&1))
|> Enum.to_list()

But is this really the best way to do it?

Especially when using GenStage.Flow, one cannot know that grabbing the next item/event will not block. What I am missing is basically a third Stream.take_while return variant such as :last to stop consumption without looking at the next item/event, EDIT: but still include the last seen element in the resulting Enum

Does this seem reasonable or am i missing something here ?

Maybe if we pull this apart it might make more sense.

my_stream = [1,2,3]
 |> Stream.map( &(&1<=2 && [&1] || [&1,nil] ))
 |> Stream.flat_map(&(&1))
 |> Stream.take_while(&(&1))


  iex(3)> my_stream |> Enum.to_list
    [1, 2, 3]
  iex(4)> my_stream |> Enum.take(1)
    [1]
  iex(5)> my_stream |> Enum.drop_while( fn x -> x > 1 end)
    [1, 2, 3]
  iex(6)> my_stream |> Enum.drop_while( fn x -> x < 2 end)
    [2, 3]

A Stream is mostly just a composition of functions, the interface it exposes can’t be halted. The best way to thing of them is Enumerables that you compute on the fly rather than read the next ptr in a linked list. They don’t have a “state” that you can stop and restart.

Now internally the Enumerable protocol does support the kind of “stopping/starting” you are thinking of, but that interface is mostly not exposed (deliberately is my impression ) to either Enum
or Stream.

I can see a need for the kind of thing you want, but it’s not a Stream.

Sorry, I had a typo,

I meant |> Stream.map( &(&1<=2 && [&1] || [&1,nil] )) (as is now corrected).

I don’t mean to restart , so basically a stop is final.

But if the source Enumerable is a resource, (as consuming events from a GenStage.Producer or reading from a file would be) then this seems to me a common Use Case. That’s why i was wondering.

I agree that it is a common Use case, however it is neither a Stream or an Enum. There have been a number of false starts to “bolt on” this kind of functionality to Stream and even Jose couldn’t get it to work in a reasonable fashion.

I think Flow is what you might be looking for, but I am just guessing.

Stream.flat_map, Stream.transform and friends allow you to return :halt.

iex> Enum.to_list Stream.flat_map [1, 2, 3], fn _ -> :halt end
[]

Is that what you want?

Yes! :smiley: I didn’t know /notice that it works for Stream.flat_map.
Can I also return :halt from take_*/drop_* and friends?

I double checked right now the docs on hex.pm. Is this some upcoming change?

It has always been supported on Stream.flat_map. If the docs do not mention so, then we should update it. :halt is not supported in take_*/drop_*.