How to get Stream to run?

Hi there,

I’ve got the following function to read from a CSV file and insert the data into the address table of my database.

  def process_csv_file(%User{type: "customer"} = customer, file) do
    File.stream!(file)
    |> Stream.map(& &1)
    |> CSV.decode(separator: ?;, headers: false)
    |> Stream.map(fn x ->
      case x do
        {:ok, [name, address, zip_code, city, country]} ->
          create_address(customer, %{
            name: name,
            address_one: address,
            zip_code: zip_code,
            city: city,
            country: country
          })

        _ ->
          IO.puts("Something went wrong...")
      end
    end)
    |> Stream.run()
  end

If I try to run it, it returns :ok and nothing happens.

But if I use Enum instead of Stream, everything runs fine. What am I doing wrong here? :slight_smile:

(In this case it isn’t very important, it’s just a small file. But I’m hoping to learn something from this.)

Taking the answer from the docs for Stream.run/1:

This is useful when a stream needs to be run, for side effects, and there is no interest in its return result.

Examples

Open up a file, replace all # by % and stream to another file without loading the whole file in memory:

File.stream!("/path/to/file")
|> Stream.map(&String.replace(&1, "#", "%"))
|> Stream.into(File.stream!("/path/to/other/file"))
|> Stream.run()

One might prefer Stream.run over Enum.map when the expected result of the stream iterations is the side effect itself, so it does not have to return any value, and in that case, it would just discard the iteration results, which might be more efficient for most cases (like the example the docs gave).

That’s not the case for your function there, as far as I understand, as you want to process a CSV and return a list of addresses from it.

2 Likes

Thanks Kelvinst. But I’m afraid that I still don’t understand this. :grinning:

In the example from the docs: it reads a file → does some operation on it → writes to another file.

In my function: it reads a file → does some operation on it → and it calls a function to write to the database.

Isn’t the side effect more or less the same in both cases?

How are you running this code exactly? And what does the code look like when you use enum?

It does seem like you need Stream.each and not Stream.map (because you’re not interested in the return value of the operation) which you then pipe to Stream.run.

This can be vastly simplified if you just put Enum.each at the end, e.g.:

    File.stream!(file)
    |> Stream.map(& &1)
    |> CSV.decode(separator: ?;, headers: false)
    |> Enum.each(fn x ->
       # ...
    end)

BTW the final step can be massively parallelized with Task.async_stream instead – if all it does is insert records in a DB but let’s not make this even more confusing.

I see, so you don’t want the actual list of new addresses then? AFAIK it should work with Stream.run() then. When you say “nothing happens” you mean the addresses are not being created?