Switching streams in runtime

I have a function which does this

File.stream!("a.txt")
    |> Stream.map(&apply_filter/1)
    |> Stream.cycle

I want to switch the source from a.txt to b.txt in runtime based on a trigger.
How can I achieve this in elixir?

The only way I can think of is by implementing a custom stream handler, and have the stream reducer do this:

receive do
  {:switch, file} -> ...
  after 0 -> read_current_file
end

I don’t recommend doing this, unless you have experience writing your own streams.

2 Likes

Do you want to change the stream source while the stream is in the middle of processing? Or can you just check for the source before the processing starts?

2 Likes

I would like to switch the source while processing

You can make a new stream that just takes the other file streams as input and returns whichever one based on whatever condition. Or it could open a new stream internally to switch. Etc… To do the simple thing of just taking a bit of data until a certain condition is reached you can just do (the 1..10’s can just be swapped with the file streams):

a = File.stream!("...")
|> Stream.take_while(&some_condition_test/1)
|> Stream.concat(File.stream("..."))
|> Stream.whatever...

Or if you want fine grained control and lots of swapping and all, then just make a new stream resource:

Stream.resource(
  fn -> File.open!("first_file") end,
  fn file ->
    file = case some_condition_test(...) do
      nil -> file
      new_file_name ->
        File.close(file)
        file = File.open!(new_file_name)
    end
    case IO.read(file, :line) do
      data when is_binary(data) -> {[data], file}
      _ -> {:halt, file}
    end
  end,
  fn file -> File.close(file) end
)
|> Stream.whatever...
2 Likes