How to halt stream while returning a last value

Hello,

I have this example where I would like a stream transformation to return the whole input list up to the third atom, included.

input = ["a", "b", "c", "d", :x, :y, "e", "f", :z, "g", "h", "i", :aa, :bb, "j"]

Expected output:

["a", "b", "c", "d", :x, :y, "e", "f", :z]

All I can do for now is to return:

["a", "b", "c", "d", :x, :y, "e", "f"]

Of course it would be very easy without the constraint: Further elements from the stream should not be processed once the final atom is found in the stream.

Do you know how I should do that?

Thank you!

Some test code:

defmodule Demo do
  def run do
    input = ["a", "b", "c", "d", :x, :y, "e", "f", :z, "g", "h", "i", :aa, :bb, "j"]

    wanted_atoms = 3

    input

    #
    # Classifier function. In real application it costs money to run so we never
    # want to call it if we are not going to process its result.
    |> Stream.map(fn
      "g" -> raise ~s(should not process "g")
      value when is_atom(value) -> {:atom, value}
      value -> {:other, value}
    end)

    # Take 3 atoms if we find them in the stream, take the whole stream
    # otherwise.

    |> Stream.transform(
      _start_fun = fn -> 0 end,
      _redux_fun = fn
        {:atom, value}, atom_count when atom_count == wanted_atoms - 1 -> {:halt, {:last, value}}
        {:atom, value}, atom_count -> {[value], atom_count + 1}
        {:other, value}, atom_count -> {[value], atom_count}
      end,
      __last_fun = fn
        # Not called when the reducer returns :halt
        {:last, value} -> {[value], nil}
        n when is_integer(n) -> {:halt, nil}
      end,
      _after_fun = fn acc -> :ok end
    )
    |> Enum.to_list()
    |> IO.inspect(limit: :infinity, label: "final")
  end

  defp get_score(2), do: {:ok, 1234}
  defp get_score(_), do: {:error, :nope}
end

Demo.run()

Have you tried using something like Stream.take_while/2?

I guess you’d need to write your own code integrating with the Enumerable protocol to do that. Afaik none of the APIs in Stream would allow you to emit a value to the resulting enumerable and at the same time stop consuming more items from the source enumerable.

You can do it with a sentinel value, since each step of Stream.transform can emit multiple results:

    |> Stream.transform(
      fn -> 0 end,
      fn
        {:atom, a}, atom_count when atom_count+1 < wanted_atoms ->
          {[a], atom_count+1}
        {:atom, a}, _atom_count ->
          {[a, :__no_more_atoms], :ok}
        {_, v}, atom_count ->
          {[v], atom_count}
      end,
      fn _ -> :ok end
    )
    |> Stream.take_while(& &1 != :__no_more_atoms)

The transform produces a stream with an extra value on the end when the last wanted atom is seen, then take_while snips it off and terminates the stream.

5 Likes

Thank you guys!

Alright, no time to dig into the inner workings of Stream.transform today. Your solution is good enough, which is the perfect flavor of good.

input = ["a", "b", "c", "d", :x, :y, "e", "f", :z, "g", "h", "i", :aa, :bb, "j"]

wanted_atoms = 3

stop_ref = make_ref()

input
|> Stream.map(fn
  "g" -> raise ~s(should not process "g")
  value when is_atom(value) -> {:atom, value}
  value -> {:other, value}
end)
|> Stream.transform(0, fn
  {:atom, value}, atom_count when atom_count == wanted_atoms - 1 -> {[value, stop_ref], nil}
  {:atom, value}, atom_count -> {[value], atom_count + 1}
  {:other, value}, atom_count -> {[value], atom_count}
end)
|> Stream.take_while(&(&1 != stop_ref))
|> Enum.to_list()
|> IO.inspect(limit: :infinity, label: "final")

I have also used Stream.concat for this eg. Stream.concat(other_stream, [:ending_value])

2 Likes