Understanding Stream.resource function

I have a csv file and I want to read the data in chunks like 100. To do that I’m using a Stream function.


def read_csv_file() do
    File.stream!("annual.csv")
    |> CSV.decode(headers: true, strip_fields: true)
    |> Stream.chunk_every(100)
    |> Enum.map(fn {_, data} -> data end)

end

data looks like this

 [%{
    "Industry_aggregation_NZSIOC" => "Level 1",
    "Industry_code_ANZSIC06" => "ANZSIC06 division A",
    "Industry_code_NZSIOC" => "AA",
    "Industry_name_NZSIOC" => "Agriculture, Forestry and Fishing",
    ...
  },
  %{
    "Industry_aggregation_NZSIOC" => "Level 1",
    "Industry_code_ANZSIC06" => "ANZSIC06 division A",
    "Industry_code_NZSIOC" => "AA",
    ...
  },
  %{
    "Industry_aggregation_NZSIOC" => "Level 1",
    "Industry_code_ANZSIC06" => "ANZSIC06 division A",
    ...
  },
  %{"Industry_aggregation_NZSIOC" => "Level 1", ...},
  %{...},
  ...
]

This code takes a lot of time to read all the records. Also stream.chunk_every is not working in this.

I want to understand how stream.resource will be useful here?
or we can improve the performance without using this?

Not sure what csv library you are using. I would recommend nimble_csv.

alias NimbleCSV.RFC4180, as: CSV

def read_csv_file() do
  "annual.csv"
  |> File.stream!()
  |> CSV.parse_stream()
  |> Stream.chunk_every(100)
  |> Enum.map(fn {_, data} -> data end)
end
1 Like

hey. But can we get the same data structure?

Right now it look like this

[
  ["2021", "Level 1", "99999", "All industries", "Dollars (millions)", "H01",
   "Total income", "Financial performance", "757,504",
   "ANZSIC06 divisions A-S (excluding classes K6330, L6711, O7552, O760, O771, O772, S9540, S9601, S9602, and S9603)"],
  ["2021", "Level 1", "99999", "All industries", "Dollars (millions)", "H04",
   "Sales, government funding, grants and subsidies", "Financial performance",
   "674,890",
   "ANZSIC06 divisions A-S (excluding classes K6330, L6711, O7552, O760, O771, O772, S9540, S9601, S9602, and S9603)"],
  ["2021", "Level 1", "99999", "All industries", "Dollars (millions)", "H05",
   "Interest, dividends and donations", "Financial performance", "49,593",
   "ANZSIC06 divisions A-S (excluding classes K6330, L6711, O7552, O760, O771, O772, S9540, S9601, S9602, and S9603)"],

does it support converting to map where header can be key?


def read_csv_file() do
    File.stream!("annual.csv")
    |> NimbleCSV.parse_stream()
    |> Enum.map(fn data -> data end)
    |> Stream.map(fn [year | _] -> %{year: year} end)
    |> Enum.to_list()
 end

solved it

Right, sorry - I forgot that option skip_headers has default value set to true. You need to change it to false.

For more information, see: NimbleCSV.parse_stream/2

You can convert them to maps yourself but I don’t see the value – you’ll create and throw away a lot of objects and your code might actually become slower. Fact is, you’ll get batches of CSV records like you requested and you can process them as needed.

Hey, I have already created a list of maps. Can you give me an example of creating batches

like here

 def read_csv_file() do
    File.stream!("annual.csv")
    |> NimbleCSV.parse_stream(skip_header: false)
    |> Stream.map(fn [year | _] -> %{year: year} end)
    |> Enum.chunk_every(100)
    |> Enum.to_list()
    |> process_data()
  end

my plan was to create batches of 100 and pass them into the process_data() function which does all the other stuff. But I need something like a background job for this where some chunk of can process first

The code that @Eiji recommended is good, use that.