Stream.unfold keeps emitting values, not sure why

I have a harvesting script that first runs search queries with a given batch_size and an increasing offset. Then I request the details for each item returned in the search.

I omitted my first search below, which gave me the total number of results, that is also the reason why I start with batch_size and not 0 offset. The script works perfectly fine so far, except when the total is equal to batch_size: In that case Stream.unfold keeps emitting values forever…

I tried my basic setup in iex:

iex(1)> Stream.unfold(5, fn n when n >= 5 -> nil; n -> {n, n-1} end) |> Enum.to_list
[]

Which works as intended. So I suppose it has something to do with the Streams and Enums following? What is the explanation and how do I fix it?

The code:

Stream.unfold(batch_size, fn
      n when n >= total -> nil;
      n -> {n, n + batch_size}
      end)
    |> Stream.map(fn(x) ->
        url = "#{@base_url}/search?limit=#{batch_size}&offset#{x}&q=*"
        Logger.info "Fetching next batch: #{url}"
        url
       end)
    |> Stream.map(&Task.async(fn -> start_query(&1) end ))
    |> Stream.map(&Task.await(&1, :infinity))
    |> Stream.map(&handle_response(&1))
    |> Enum.map(&Task.async(fn -> fetch_details(&1) end ))
    |> Enum.map(&Task.await(&1, :infinity))
1 Like

You don’t want to use Stream.map(&Task.async) |> Stream.map(&Task.await). The goal of Streams is to process items one by one, without loading the whole collection, so the above means that you are emitting item, starting a task async and then immediately calling await on it. You are not leveraging concurrency. Maybe that’s why you have the impression the system is not working as expected?

2 Likes

Oh, right.

Now all batches are fetched concurrently but it does not really solve my initial problem. Somehow the Stream.unfold never returns the nil value when the initial batch_size is equal to total, thus never stops emitting values (now just concurrently instead of waiting for the previous batch to finish).

What are the values of batch_size and total?

José Valimwww.plataformatec.com.br
http://www.plataformatec.com.br/Founder and Director of R&D

The values do not seem to matter, with 10-10 it’s the same as 10000-10000.

I can’t reproduce it:

iex(1)> batch_size = 10
10
iex(2)> total = 50
50
iex(3)> Stream.unfold(batch_size, fn
...(3)>       n when n >= total -> nil;
...(3)>       n -> {n, n + batch_size}
...(3)>       end) |> Enum.to_list
[10, 20, 30, 40]

Another example:

iex(4)> batch_size = 10
10
iex(5)> total = 10
10
iex(8)> Stream.unfold(batch_size, fn
...(8)>       n when n >= total -> nil;
...(8)>       n -> {n, n + batch_size}
...(8)>       end) |> Enum.to_list
[]
1 Like

I see.

Thank you so far. Glad it was you who answered, this way I know at least that it probably won’t be too obvious an answer. :wink:

I will fiddle around with my code some more and see if I notice something.

Cheers!