Stream, continue downstream when upstream halted

I have a stream in a pipeline that takes binary elements from the upstream and transforms them. Now I would like my stream to continue emitting binaries for some extra time even after that the upstream producer has halted.
I found some helpful documentation on stackoverflow and was able to “handcraft” an own stream that continues by insert a Stream.unfold after upstream halt:

  def stream(enum, analog_echo) do
    step = fn frames, _acc -> {:suspend, next(frames, analog_echo)} end
    next_enum = &Enumerable.reduce(enum, &1, step)
    # When upstream halted - emit 500 more using empty binary as input
    enum2 =
      Stream.unfold(500, fn
        0 -> nil
        x -> {<<>>, x - 1}
      end)
    next_enum2 = &Enumerable.reduce(enum2, &1, step)
    &do_stream([next_enum2], next_enum, &1, &2)
  end

  defp do_stream(next_enums, next_enum, {:suspend, acc}, fun) do
    {:suspended, acc, &do_stream(next_enums, next_enum, &1, fun)}
  end

  defp do_stream(_next_enums, _next_enum, {:halt, acc}, _fun) do
    {:halted, acc}
  end

  defp do_stream(next_enums, next_enum, {:cont, acc}, fun) do
    case next_enum.({:cont, []}) do
      {:suspended, frames, next_enumx} ->
        do_stream(next_enums, next_enumx, fun.(frames, acc), fun)
      # Halted, pick next stream in list
      {_, _} ->
        case next_enums do
          [] -> {:halted, acc}
          [h|t] -> do_stream(t, h, fun.(<<>>, acc), fun)
        end
    end
  end

This works, but it took some trial and error to get it working and I honestly do not fully understand the details here. Has anyone encounter a similar problem and/or are there any Stream library already available that can take several Enumerables as input and pick the next in turn when the first is halted?

This sounds like Stream.concat/1.

An example similar to yours could be written like this:

iex(1)> s1 = 1..10
1..10

iex(2)> s2 = 1..20
1..20

iex(3)> filler = Stream.cycle([nil]) |> Stream.take(20)
#Stream<[
  enum: #Function<65.33009823/2 in Stream.unfold/2>,
  funs: [#Function<59.33009823/1 in Stream.take_after_guards/2>]
]>

# NOTE: not all of filler will get used

iex(4)> s1_with_filler = Stream.concat([s1, filler])
#Function<63.33009823/2 in Stream.transform/3>

iex(6)> Stream.zip(s1_with_filler, s2) |> Enum.to_list()
[
  {1, 1},
  {2, 2},
  {3, 3},
  {4, 4},
  {5, 5},
  {6, 6},
  {7, 7},
  {8, 8}, 
  {9, 9},
  {10, 10},
  {nil, 11},
  {nil, 12},
  {nil, 13},
  {nil, 14},
  {nil, 15},
  {nil, 16},
  {nil, 17},
  {nil, 18},
  {nil, 19},
  {nil, 20}
]

Thank you, I didn’t see that. So much better now!

  def stream(enum, analog_echo) do
    # When upstream halted - emit 500 more with empty binary as input
    enum2 =
      Stream.unfold(500, fn
        0 -> nil
        x -> {<<>>, x - 1}
      end)
    Stream.concat(enum, enum2) |>
      Stream.map(fn frames -> next(frames, analog_echo) end)
  end