How to halt stream while returning a last value

Hello,

I have this example where I would like a stream transformation to return the whole input list up to the third atom, included.

input = ["a", "b", "c", "d", :x, :y, "e", "f", :z, "g", "h", "i", :aa, :bb, "j"]

Expected output:

["a", "b", "c", "d", :x, :y, "e", "f", :z]

All I can do for now is to return:

["a", "b", "c", "d", :x, :y, "e", "f"]

Of course it would be very easy without the constraint: Further elements from the stream should not be processed once the final atom is found in the stream.

Do you know how I should do that?

Thank you!

Some test code:

defmodule Demo do
  def run do
    input = ["a", "b", "c", "d", :x, :y, "e", "f", :z, "g", "h", "i", :aa, :bb, "j"]

    wanted_atoms = 3

    input

    #
    # Classifier function. In real application it costs money to run so we never
    # want to call it if we are not going to process its result.
    |> Stream.map(fn
      "g" -> raise ~s(should not process "g")
      value when is_atom(value) -> {:atom, value}
      value -> {:other, value}
    end)

    # Take 3 atoms if we find them in the stream, take the whole stream
    # otherwise.

    |> Stream.transform(
      _start_fun = fn -> 0 end,
      _redux_fun = fn
        {:atom, value}, atom_count when atom_count == wanted_atoms - 1 -> {:halt, {:last, value}}
        {:atom, value}, atom_count -> {[value], atom_count + 1}
        {:other, value}, atom_count -> {[value], atom_count}
      end,
      __last_fun = fn
        # Not called when the reducer returns :halt
        {:last, value} -> {[value], nil}
        n when is_integer(n) -> {:halt, nil}
      end,
      _after_fun = fn acc -> :ok end
    )
    |> Enum.to_list()
    |> IO.inspect(limit: :infinity, label: "final")
  end

  defp get_score(2), do: {:ok, 1234}
  defp get_score(_), do: {:error, :nope}
end

Demo.run()

Have you tried using something like Stream.take_while/2?

I guess you’d need to write your own code integrating with the Enumerable protocol to do that. Afaik none of the APIs in Stream would allow you to emit a value to the resulting enumerable and at the same time stop consuming more items from the source enumerable.

You can do it with a sentinel value, since each step of Stream.transform can emit multiple results:

    |> Stream.transform(
      fn -> 0 end,
      fn
        {:atom, a}, atom_count when atom_count+1 < wanted_atoms ->
          {[a], atom_count+1}
        {:atom, a}, _atom_count ->
          {[a, :__no_more_atoms], :ok}
        {_, v}, atom_count ->
          {[v], atom_count}
      end,
      fn _ -> :ok end
    )
    |> Stream.take_while(& &1 != :__no_more_atoms)

The transform produces a stream with an extra value on the end when the last wanted atom is seen, then take_while snips it off and terminates the stream.

Thank you guys!

Alright, no time to dig into the inner workings of Stream.transform today. Your solution is good enough, which is the perfect flavor of good.

input = ["a", "b", "c", "d", :x, :y, "e", "f", :z, "g", "h", "i", :aa, :bb, "j"]

wanted_atoms = 3

stop_ref = make_ref()

input
|> Stream.map(fn
  "g" -> raise ~s(should not process "g")
  value when is_atom(value) -> {:atom, value}
  value -> {:other, value}
end)
|> Stream.transform(0, fn
  {:atom, value}, atom_count when atom_count == wanted_atoms - 1 -> {[value, stop_ref], nil}
  {:atom, value}, atom_count -> {[value], atom_count + 1}
  {:other, value}, atom_count -> {[value], atom_count}
end)
|> Stream.take_while(&(&1 != stop_ref))
|> Enum.to_list()
|> IO.inspect(limit: :infinity, label: "final")

I have also used Stream.concat for this eg. Stream.concat(other_stream, [:ending_value])