sahilpaudel

sahilpaudel

How to process data asynchronously in Elixir?

Hi Everyone,

I have controller that takes request with a CSV file & email_id in the parameter.
What is does each read through each line of the CSV file and process the data and insert into the DB and returns the processed data for each line from the DB.

I add this processed data into a list and dump it as iodata into CSV binary and send that CSV file as an attachment using :bamboo library.

The problem I am facing is as the size of the file grows the request is taking time to process so I want to do it asynchronously.

Like I get /process data request and then send response as it is being processed to the client and do the job in the backend and send the mail on completion of the job.

Below is the snippet of my code.

def call(csv_path, email) do
    csv_path
      |> read_csv
      |> dump_to_csv
      |> add_attachment
      |> Email.create(email)
      |> Email.send_email
      |> form_response
  end

  def read_csv(csv_path) do
      csv_path
        |> File.stream!
        |> MyParser.parse_stream
        |> Stream.map(fn [arg1, arg2] -> %{prefix: arg1, url: arg2} end)
        |> Enum.flat_map(fn(params) -> url_convertor(params) end)
  end

  def url_convertor(%{url: url, prefix: prefix}) do
    with {:ok, %Route{} = route} <- MyApp.process_data(%{"prefix" => prefix, "url" => url}) do
      route
        |> MyApp.processed_data
        |> add_to_list(route, prefix)
    else
      {:error, %{changes: changes, errors: _}} -> [[prefix, changes[:url], "INVALID_URL"]] 
    end
  end

  def add_to_list(new_url, %{url: original_url}, prefix) do
    [[prefix, original_url, new_url] | []]
  end

  def dump_to_csv(list) do
    [["PREFIX", "ORIGINAL_URL", "NEW_URL"]] ++ list
      |> MyParser.dump_to_iodata()
      |> IO.iodata_to_binary
  end

  def add_attachment(binary_data) do
    %Bamboo.Attachment{content_type: "text/csv", filename: "data.csv", data: binary_data}
  end

Library Used:

:nimble_csv, bamboo_smtp

Can you help me how to make this process async.

Marked As Solved

lucaong

lucaong

Yes, that’s because CSVParser.call (and therefore Controller.create) still blocks by awaiting the task. As I wrote above, if you don’t care about the result, you can use Task.start_link/1. The controller in your case does not need to wait for the email to be sent, so in CSVParser you can do this:

def call
  Task.start_link(fn -> do_async_task(csv_path, email) end)
end

Also, controller functions should always return conn (as the error message says). So make sure you return conn in your create method:

def create(conn, %{"file" => %Plug.Upload{path: fullpath, content_type: "text/csv"}, "email" => email}) do
  CSVParser.call(fullpath, email)
  conn |> render_json_response({:ok, %{message: "Request Initiated"}})
end

Also Liked

lucaong

lucaong

If you want to execute the task in the background as a side effect, without waiting for the result, you can use either Task.start_link/{1,3} or Task.Supervisor.start_child/{3,5}.

Another common way to perform computation asynchronously in the background is to use a background job library like Oban.

NobbZ

NobbZ

Task.async/1 will start the job in a new process.

Task.await/1 will wait up to 5 seconds and then either return the result or crash the current process.

So the following is pretty similar to spawning many workers and then joining them at the end:

email_addresses
|> Enum.map(fn email ->
  Task.async(fn -> do_async_task(csv_path, email))
end)
|> Enum.each(&Task.await/1)

Make sure you do not need more than 5 seconds to send each individual email though ;D

You can also increase the timeout by using Task.await/2 instead:

email_addresses
|> Enum.map(fn email ->
  Task.async(fn -> do_async_task(csv_path, email))
end)
|> Enum.each(&Task.await(&1, time_out))

Here you can see an async sleep in “action”:

iex(2)> :timer.tc(fn ->
...(2)>   Enum.map(1..10, fn t ->
...(2)>     Task.async(fn ->
...(2)>       Process.sleep(t * 1000)
...(2)>     end)
...(2)>   end) |> Enum.each(&Task.await(&1, :infinity))
...(2)> end)
{10002421, :ok}
sahilpaudel

sahilpaudel

Thank you @lucaong this is what I was looking for as of now.
I may have to deal with the result later on but this is perfect for now.

Also in your spare time do let me know how can we go about this by using Task.Supervisor because it gave me no process alive whenever I tried it.

And Thank you @NobbZ for all the insights can’t mark both as answers but your answer really helped me understand the stuff I was trying to do.

Where Next?

Popular in Questions Top

aadeshere1
I have a another noob question about loop. Since elixir is immutable, while loop is not directly possible. total = 10 while total != 0 ...
New
9mm
I am constructing a JSON object (map) and I need to conditionally set a field. I’m trying to write proper elixir-way code… and I’m at a l...
New
lastday4you
I wanted to check elixir version in phoenix because i found that my elixir is 1.5 but when i use Enum.chunk_by it said the function is un...
New
skosch
To my knowledge, put_in, Map.update etc. all have the one limitation of not automatically creating intermediate keys when needed (for exa...
New
ovidiubadita
Hey all, I discovered Elixir and I love it. I always wanted to learn a functional programming and I intended to go for Haskell, but afte...
New
jaysoifer
Is there a way to rollback a specific migration and only that one (“skipping” all the other ones)? Would mix ecto.rollback -v 200809061...
New
vegabook
I’m brand new to Phoenix and I have stripped one of the demo applications to the bone. I just want to get an svg up on the screen. Here i...
New
pmjoe
I have a relationship of love and hate with Elixir. Lots of things are just absolutely right, but there are some things that are kind of ...
New
lucidguppy
I have a super simple question about elixir - how would I take a file like this foo bar baz and output a new file that enumerates th...
New
PeterCarter
There are pre-rolled solutions for other frameworks that do work. However, Phoenix does not seem to have these. Have people had good expe...
New

Other popular topics Top

albydarned
Hello all! I am typing this post from my new MacBook Pro with the M1 chip. I’m loving it so far, and will probably use it as my daily dr...
New
lessless
I believe there are people here who are dealing with CSV files import on the daily basis, and since Excel is a really popular tool there ...
New
gshaw
What is the idiomatic way of matching for not nil in Elixir? E.g., First way: defp halt_if_not_signed_in(conn, signed_in_account) when...
New
shahryarjb
Hello, I have map which I want to convert it to string like this: the map: %{last_name: "tavakkoli", name: "shahryar"} the string I ne...
New
JorisKok
I have a server on AWS, and was running a load test using artillery. When looking at the Phoenix dashboard I see the Ports going to 100% ...
New
AngeloChecked
What learn first? Rust or Elixir Hi Elixir community! I’m here because i want learn a new language. I’m a junior developer and mainly i ...
New
gausby
I asked this very same question on twitter and got some interesting feedback, but I thought it would be a good question to ask here as we...
1207 39297 209
New
jason.o
In the code below, if the create action is not set to accept “extra_key” as an input, it errors out with a message shown above. Is there ...
New
Qqwy
Update: How to use the Blogs &amp; Podcasts section You can post links to your blog posts or podcasts either in one of the Official Blog...
3271 126479 1222
New
vonH
In asking this question I am more interested about the expressiveness of the language itself and less concerned about the availability of...
New

We're in Beta

About us Mission Statement