I have controller that takes request with a CSV file & email_id in the parameter.
What is does each read through each line of the CSV file and process the data and insert into the DB and returns the processed data for each line from the DB.
I add this processed data into a list and dump it as iodata into CSV binary and send that CSV file as an attachment using :bamboo library.
The problem I am facing is as the size of the file grows the request is taking time to process so I want to do it asynchronously.
Like I get /process data request and then send response as it is being processed to the client and do the job in the backend and send the mail on completion of the job.
Below is the snippet of my code.
def call(csv_path, email) do
csv_path
|> read_csv
|> dump_to_csv
|> add_attachment
|> Email.create(email)
|> Email.send_email
|> form_response
end
def read_csv(csv_path) do
csv_path
|> File.stream!
|> MyParser.parse_stream
|> Stream.map(fn [arg1, arg2] -> %{prefix: arg1, url: arg2} end)
|> Enum.flat_map(fn(params) -> url_convertor(params) end)
end
def url_convertor(%{url: url, prefix: prefix}) do
with {:ok, %Route{} = route} <- MyApp.process_data(%{"prefix" => prefix, "url" => url}) do
route
|> MyApp.processed_data
|> add_to_list(route, prefix)
else
{:error, %{changes: changes, errors: _}} -> [[prefix, changes[:url], "INVALID_URL"]]
end
end
def add_to_list(new_url, %{url: original_url}, prefix) do
[[prefix, original_url, new_url] | []]
end
def dump_to_csv(list) do
[["PREFIX", "ORIGINAL_URL", "NEW_URL"]] ++ list
|> MyParser.dump_to_iodata()
|> IO.iodata_to_binary
end
def add_attachment(binary_data) do
%Bamboo.Attachment{content_type: "text/csv", filename: "data.csv", data: binary_data}
end
Thats the final goal. I meant that single snippet. It starts a single Task and then eventually tries to “add” the result of the call to a map. Though you can not “add” to a map. You can only add to numbers, and the BEAM does only know about 2 types of numbers, floating point numbers and integers.
Yes, that’s because CSVParser.call (and therefore Controller.create) still blocks by awaiting the task. As I wrote above, if you don’t care about the result, you can use Task.start_link/1. The controller in your case does not need to wait for the email to be sent, so in CSVParser you can do this:
def call
Task.start_link(fn -> do_async_task(csv_path, email) end)
end
Also, controller functions should always return conn (as the error message says). So make sure you return conn in your create method:
You need to pass an actually running supervisor into task.supervisor.start…
You probably put a symbol in there that wasn’t a supervisor. Usually you start your supervisors in your Application module, then it’s available at runtime and you can use it in your code.