I have a stream in a pipeline that takes binary elements from the upstream and transforms them. Now I would like my stream to continue emitting binaries for some extra time even after that the upstream producer has halted.
I found some helpful documentation on stackoverflow and was able to “handcraft” an own stream that continues by insert a Stream.unfold after upstream halt:
def stream(enum, analog_echo) do
step = fn frames, _acc -> {:suspend, next(frames, analog_echo)} end
next_enum = &Enumerable.reduce(enum, &1, step)
# When upstream halted - emit 500 more using empty binary as input
enum2 =
Stream.unfold(500, fn
0 -> nil
x -> {<<>>, x - 1}
end)
next_enum2 = &Enumerable.reduce(enum2, &1, step)
&do_stream([next_enum2], next_enum, &1, &2)
end
defp do_stream(next_enums, next_enum, {:suspend, acc}, fun) do
{:suspended, acc, &do_stream(next_enums, next_enum, &1, fun)}
end
defp do_stream(_next_enums, _next_enum, {:halt, acc}, _fun) do
{:halted, acc}
end
defp do_stream(next_enums, next_enum, {:cont, acc}, fun) do
case next_enum.({:cont, []}) do
{:suspended, frames, next_enumx} ->
do_stream(next_enums, next_enumx, fun.(frames, acc), fun)
# Halted, pick next stream in list
{_, _} ->
case next_enums do
[] -> {:halted, acc}
[h|t] -> do_stream(t, h, fun.(<<>>, acc), fun)
end
end
end
This works, but it took some trial and error to get it working and I honestly do not fully understand the details here. Has anyone encounter a similar problem and/or are there any Stream library already available that can take several Enumerables as input and pick the next in turn when the first is halted?