@dimitarvp, thank you!
Yesterday I was able to implement what I wanted to do with the solution you proposed 
In my process I check with the lib file_system
if a new file was detected in a specific directory and if yes then it will proceed. Each line of the CSV file is correctly imported into a MySQL database. For that, I have used Ecto.
When trying to set up Ecto, it was proposed to set up my Repo in the application.ex file like :
children = [
FileWatcher,
Store # is my Repo
]
config/config.exs:
import Config
config :file_watch_example, :ecto_repos, [Store]
config :file_watch_example, Store,
database: "store",
username: "user",
password: "passwd,
hostname: "db",
pool_size: 100
In the Ecto documentation, the namespace is not the same:
children = [
MyApp.Repo,
]
In my case, I have tried to define MyApp.Store but this did not work for me.
So I have some questions …
:
- Must I have used
MyApp.Store
instead of only Store
?
- in my config/config.exs file the 2 lines seem almost the same, it is possible to refactor them into one?:
config :file_watch_example, :ecto_repos, [Store]
config :file_watch_example, Store,
If I understand correctly the first one is to set Store
as available Ecto Repo and the second one is for the database configuration. Correct?
This is the code I ended with based on what you wrote:
defmodule IngestCSV do
alias NimbleCSV.RFC4180, as: YourCSV
NimbleCSV.define(YourCSV, separator: ";")
def load(path) do
path
|> Path.expand()
|> File.stream!(read_ahead: 524_288)
|> YourCSV.parse_stream()
|> Task.async_stream(fn [name, bar_code, price, currency] ->
# do something with a single CSV record.
# possibly best to also hand off the results to another service?
# if you want to just process all records and receive a new list
# then you should just use Stream.map and Enum.to_list at the end.
FileWatchExample.Product.create_product(%{name: name, bar_code: bar_code, price: price, currency: currency})
end, max_concurrency: 100, on_timeout: :kill_task, ordered: false)
|> Stream.run()
end
end