I have a harvesting script that first runs search queries with a given batch_size and an increasing offset. Then I request the details for each item returned in the search.
I omitted my first search below, which gave me the total number of results, that is also the reason why I start with batch_size and not 0 offset. The script works perfectly fine so far, except when the total is equal to batch_size: In that case Stream.unfold keeps emitting values forever…
I tried my basic setup in iex:
iex(1)> Stream.unfold(5, fn n when n >= 5 -> nil; n -> {n, n-1} end) |> Enum.to_list
[]
Which works as intended. So I suppose it has something to do with the Streams and Enums following? What is the explanation and how do I fix it?
The code:
Stream.unfold(batch_size, fn
n when n >= total -> nil;
n -> {n, n + batch_size}
end)
|> Stream.map(fn(x) ->
url = "#{@base_url}/search?limit=#{batch_size}&offset#{x}&q=*"
Logger.info "Fetching next batch: #{url}"
url
end)
|> Stream.map(&Task.async(fn -> start_query(&1) end ))
|> Stream.map(&Task.await(&1, :infinity))
|> Stream.map(&handle_response(&1))
|> Enum.map(&Task.async(fn -> fetch_details(&1) end ))
|> Enum.map(&Task.await(&1, :infinity))
You don’t want to use Stream.map(&Task.async) |> Stream.map(&Task.await). The goal of Streams is to process items one by one, without loading the whole collection, so the above means that you are emitting item, starting a task async and then immediately calling await on it. You are not leveraging concurrency. Maybe that’s why you have the impression the system is not working as expected?
Now all batches are fetched concurrently but it does not really solve my initial problem. Somehow the Stream.unfold never returns the nil value when the initial batch_size is equal to total, thus never stops emitting values (now just concurrently instead of waiting for the previous batch to finish).
iex(1)> batch_size = 10
10
iex(2)> total = 50
50
iex(3)> Stream.unfold(batch_size, fn
...(3)> n when n >= total -> nil;
...(3)> n -> {n, n + batch_size}
...(3)> end) |> Enum.to_list
[10, 20, 30, 40]
Another example:
iex(4)> batch_size = 10
10
iex(5)> total = 10
10
iex(8)> Stream.unfold(batch_size, fn
...(8)> n when n >= total -> nil;
...(8)> n -> {n, n + batch_size}
...(8)> end) |> Enum.to_list
[]