sahilpaudel
How to process data asynchronously in Elixir?
Hi Everyone,
I have controller that takes request with a CSV file & email_id in the parameter.
What is does each read through each line of the CSV file and process the data and insert into the DB and returns the processed data for each line from the DB.
I add this processed data into a list and dump it as iodata into CSV binary and send that CSV file as an attachment using :bamboo library.
The problem I am facing is as the size of the file grows the request is taking time to process so I want to do it asynchronously.
Like I get /process data request and then send response as it is being processed to the client and do the job in the backend and send the mail on completion of the job.
Below is the snippet of my code.
def call(csv_path, email) do
csv_path
|> read_csv
|> dump_to_csv
|> add_attachment
|> Email.create(email)
|> Email.send_email
|> form_response
end
def read_csv(csv_path) do
csv_path
|> File.stream!
|> MyParser.parse_stream
|> Stream.map(fn [arg1, arg2] -> %{prefix: arg1, url: arg2} end)
|> Enum.flat_map(fn(params) -> url_convertor(params) end)
end
def url_convertor(%{url: url, prefix: prefix}) do
with {:ok, %Route{} = route} <- MyApp.process_data(%{"prefix" => prefix, "url" => url}) do
route
|> MyApp.processed_data
|> add_to_list(route, prefix)
else
{:error, %{changes: changes, errors: _}} -> [[prefix, changes[:url], "INVALID_URL"]]
end
end
def add_to_list(new_url, %{url: original_url}, prefix) do
[[prefix, original_url, new_url] | []]
end
def dump_to_csv(list) do
[["PREFIX", "ORIGINAL_URL", "NEW_URL"]] ++ list
|> MyParser.dump_to_iodata()
|> IO.iodata_to_binary
end
def add_attachment(binary_data) do
%Bamboo.Attachment{content_type: "text/csv", filename: "data.csv", data: binary_data}
end
Library Used:
:nimble_csv, bamboo_smtp
Can you help me how to make this process async.
Marked As Solved
lucaong
Yes, that’s because CSVParser.call (and therefore Controller.create) still blocks by awaiting the task. As I wrote above, if you don’t care about the result, you can use Task.start_link/1. The controller in your case does not need to wait for the email to be sent, so in CSVParser you can do this:
def call
Task.start_link(fn -> do_async_task(csv_path, email) end)
end
Also, controller functions should always return conn (as the error message says). So make sure you return conn in your create method:
def create(conn, %{"file" => %Plug.Upload{path: fullpath, content_type: "text/csv"}, "email" => email}) do
CSVParser.call(fullpath, email)
conn |> render_json_response({:ok, %{message: "Request Initiated"}})
end
Also Liked
lucaong
If you want to execute the task in the background as a side effect, without waiting for the result, you can use either Task.start_link/{1,3} or Task.Supervisor.start_child/{3,5}.
Another common way to perform computation asynchronously in the background is to use a background job library like Oban.
NobbZ
Task.async/1 will start the job in a new process.
Task.await/1 will wait up to 5 seconds and then either return the result or crash the current process.
So the following is pretty similar to spawning many workers and then joining them at the end:
email_addresses
|> Enum.map(fn email ->
Task.async(fn -> do_async_task(csv_path, email))
end)
|> Enum.each(&Task.await/1)
Make sure you do not need more than 5 seconds to send each individual email though ;D
You can also increase the timeout by using Task.await/2 instead:
email_addresses
|> Enum.map(fn email ->
Task.async(fn -> do_async_task(csv_path, email))
end)
|> Enum.each(&Task.await(&1, time_out))
Here you can see an async sleep in “action”:
iex(2)> :timer.tc(fn ->
...(2)> Enum.map(1..10, fn t ->
...(2)> Task.async(fn ->
...(2)> Process.sleep(t * 1000)
...(2)> end)
...(2)> end) |> Enum.each(&Task.await(&1, :infinity))
...(2)> end)
{10002421, :ok}
sahilpaudel
Thank you @lucaong this is what I was looking for as of now.
I may have to deal with the result later on but this is perfect for now.
Also in your spare time do let me know how can we go about this by using Task.Supervisor because it gave me no process alive whenever I tried it.
And Thank you @NobbZ for all the insights can’t mark both as answers but your answer really helped me understand the stuff I was trying to do.
Popular in Questions
Other popular topics
Categories:
Sub Categories:
Forums
Popular Tags
- #ecto
- #liveview
- #troubleshooting
- #learning-elixir
- #deployment
- #library
- #erlang
- #testing
- #genserver
- #mix
- #absinthe
- #remote-other
- #otp
- #plug
- #how-to-question
- #macros
- #postgres
- #channels
- #elixirconf
- #exunit
- #discussion
- #javascript
- #code-sync
- #podcasts
- #onsite
- #dialyzer
- #docker
- #authentication
- #umbrella
- #full-time-contract
- #podcasts-by-brainlid
- #ecto-query
- #elixir-ls
- #phoenix_html
- #iex
- #blog-post
- #graphql
- #genstage
- #ai
- #websockets
- #supervisor
- #advent-of-code
- #elixirconf-us
- #distillery
- #processes
- #forms
- #api
- #metaprogramming
- #security
- #performance








