I’m developing an ETL app which read large (in million) amount of rows from a CSV file. Currently, there’s no Broadway producer for a file stream so I made a custom producer based on Broadway docs. and come up with the custom (see below), it works but it’s slow and seems like there is a duplicate read or rows. Am | doing it right?
defmodule CsvReader.Stream do
use GenStage
def start_link() do
GenStage.start_link(__MODULE__, [])
end
def init(_args) do
{:producer, get_records()}
end
defp get_records() do
"/dropbox/10Oct2010trans.csv"
|> File.stream!()
|> NimbleCSV.RFC4180.parse_stream()
end
def handle_demand(demand, state) when demand > length(state) do
handle_demand(demand, state ++ get_records())
end
def handle_demand(demand, state) do
{to_dispatch, remaining} = Enum.split(state, demand)
{:noreply, to_dispatch, remaining}
end
end