Stop a stream from the outside

I have a stream that queries a database and emits the search results.

Depending on the inputs it can run for a while.

I now want to implement a function that evaluates the results and stops the stream as soon as there a result that I like.

My first idea was to use something like Enum.find for streams (unfortunately this doesn’t exist).

I found out that I can achieve something I want with Stream.transform:

list_of_elements = Stream.transform(
  db_stream,
  false,
  fn(e, found) ->
    if found do
      {:halt, found}
    else
      if e == what_im_looking_for dodo
        {[r], true}
      else
        {[], false}
      end
    end
  end
)

case list_of_elements do
  [] -> IO.puts "not found"
  [elem] -> IO.puts "found it: #{inspect(elem)}
end

This works. The only downside is: as I cannot return an element with {:halt, acc} the stream will be executed for another round. Meaning one db query will be executed unnecessarily.

Is there a way around or maybe even a better way to achieve what I’m trying to do?

1 Like

Enum.find works on streams. What doesn’t work about:

db_stream |> Enum.find(fn e -> e == what_im_looking_for end)
1 Like

For some reason I though Enum.find() would process the whole stream.

Your snippet works, thanks!

It consumes the stream until it finds a match. If there are no things that match it will consume the whole stream.