Operating on large amount of data

hello all, I want to operate on large amount of data where I pass a query to get some records and then for each record I have to perform a costly operation (creating large number of redis keys(~1000) for each record data and then deleting it) so here I want to use a stream to make the strain less. My code till now is-

      Repo.transaction(
        fn ->
          query
          |> Repo.stream(max_rows: @query_batch)
          |> Stream.chunk_every(@query_batch)
          |> Stream.each(fn x ->
             Enum.to_list(x) |> Enum.each(fn y -> delete_from_redis(y) end)
          end)
          |> Enum.to_list()
        end,
        timeout: :infinity
      )

here in this code when I tried to do -

Stream.each(x, fn y -> delete_from_redis(y) end)

in place of-

Enum.to_list(x) |> Enum.each(fn y -> delete_from_redis(y) end)

my code did not worked, please explain the reason for that also please tell me what can I do to make my current code more efficient?

What was the error message?

no error message the code just did nothing.

Re: your specific question:

Stream.each doesn’t run any computation, you have to call a function like Enum.to_list or Stream.run to make Stream do things.

A more general thought: this code returns all the rows from query which neutralizes a lot of the benefit from streaming; Stream.run will do the computation but ignore the result, if that’s your intent.

1 Like

I read about what stream.run do and what the documentation mentioned was-

Runs the given stream.

This is useful when a stream needs to be run, for side effects, and there is no interest in its return result.

to be honest I was not able to make any sense of it hence I tried using stream.each and thought it would be similar to enum.each and would call a function for each value. Can you please explain how stream.run works?

I tried using stream.run assuming that it makes all the stream command inputted in it work. the code I tried is-

      Repo.transaction(
        fn ->
          query
          |> Repo.stream(max_rows: @query_batch)
          |> Stream.chunk_every(@query_batch)
          |> Stream.each(fn x ->
             Stream.each(x, fn y -> delete_from_redis(y) end)
          end)
          |> Stream.run()
        end,
        timeout: :infinity
      )

but it does not work. can you point out where I am making the mistake?

Stream.each returns a new stream that calls the supplied function as each element is generated. It doesn’t iterate by itself.

Example:

iex(1)> a = [1,2,3]
[1, 2, 3]

iex(2)> stream = Stream.each(a, &IO.inspect/1)
#Stream<[enum: [1, 2, 3], funs: [#Function<38.58486609/1 in Stream.each/2>]]>

iex(3)> Stream.run(stream)
1
2
3
:ok

iex(4)> Enum.to_list(stream)
1
2
3
[1, 2, 3]

Note that in the second line, stream is bound as a Stream struct but nothing has been IO.inspect-ed yet.

Stream.run causes the stream to compute values but discards them and returns :ok, while Enum.to_list accumulates and returns the values.

2 Likes

then shouldn’t the code above your reply run here I have added Stream.run. so first I will make chunks of data with Stream.chunk_every(@query_batch) and then pass it to Stream.each for going to each chunk and then for each chunk I will pass it to Stream.each so that I can iterate over the records and then call the required function for each record, and lastly all of this is passed to stream.run() to implement this. Sorry If my understanding is incorrect.

Your overall stream does, but you’re doing another Stream.each inside a Stream.each and that inner one isn’t doing anything:

          |> Stream.each(fn x ->
             Stream.each(x, fn y -> delete_from_redis(y) end)
          end)

The inner Stream.each should probably be an Enum.each since the x list is already loaded into memory, streaming here doesn’t accomplish anything.

2 Likes

ok, understood. thanks