How to process data asynchronously in Elixir?

Hi Everyone,

I have controller that takes request with a CSV file & email_id in the parameter.
What is does each read through each line of the CSV file and process the data and insert into the DB and returns the processed data for each line from the DB.

I add this processed data into a list and dump it as iodata into CSV binary and send that CSV file as an attachment using :bamboo library.

The problem I am facing is as the size of the file grows the request is taking time to process so I want to do it asynchronously.

Like I get /process data request and then send response as it is being processed to the client and do the job in the backend and send the mail on completion of the job.

Below is the snippet of my code.

def call(csv_path, email) do
    csv_path
      |> read_csv
      |> dump_to_csv
      |> add_attachment
      |> Email.create(email)
      |> Email.send_email
      |> form_response
  end

  def read_csv(csv_path) do
      csv_path
        |> File.stream!
        |> MyParser.parse_stream
        |> Stream.map(fn [arg1, arg2] -> %{prefix: arg1, url: arg2} end)
        |> Enum.flat_map(fn(params) -> url_convertor(params) end)
  end

  def url_convertor(%{url: url, prefix: prefix}) do
    with {:ok, %Route{} = route} <- MyApp.process_data(%{"prefix" => prefix, "url" => url}) do
      route
        |> MyApp.processed_data
        |> add_to_list(route, prefix)
    else
      {:error, %{changes: changes, errors: _}} -> [[prefix, changes[:url], "INVALID_URL"]] 
    end
  end

  def add_to_list(new_url, %{url: original_url}, prefix) do
    [[prefix, original_url, new_url] | []]
  end

  def dump_to_csv(list) do
    [["PREFIX", "ORIGINAL_URL", "NEW_URL"]] ++ list
      |> MyParser.dump_to_iodata()
      |> IO.iodata_to_binary
  end

  def add_attachment(binary_data) do
    %Bamboo.Attachment{content_type: "text/csv", filename: "data.csv", data: binary_data}
  end

Library Used:

:nimble_csv, bamboo_smtp

Can you help me how to make this process async.

You could try using Task.async

Hi @cpgo Can you please help me out here as this is new to me ?

I updated my code to

def call(csv_path, email) do
    task = Task.async(fn -> do_async_task(csv_path, email) end)
    res = send_status() 
    res + Task.await(task)
  end

  def send_status() do
    %{response: %{message: "Request Initiated"}}
  end

  def do_async_task(csv_path, email) do
    csv_path
      |> read_csv
      |> dump_to_csv
      |> add_attachment
      |> Email.create(email)
      |> Email.send_email
  end

But still no luck can you guys help me here?

res will be a map, you can’t have a map on the left hand side of +/2.

What are you trying to achieve?

This is what i want to do actually.

Thats the final goal. I meant that single snippet. It starts a single Task and then eventually tries to “add” the result of the call to a map. Though you can not “add” to a map. You can only add to numbers, and the BEAM does only know about 2 types of numbers, floating point numbers and integers.

Yes, It was wrong of me doing that, I was just trying to mimic what was there in the Task.async documentation.

But that didn’t work and I am totally clueless how to go about that using Task.async.

Can you please help me here?

Task.async/1 will start the job in a new process.

Task.await/1 will wait up to 5 seconds and then either return the result or crash the current process.

So the following is pretty similar to spawning many workers and then joining them at the end:

email_addresses
|> Enum.map(fn email ->
  Task.async(fn -> do_async_task(csv_path, email))
end)
|> Enum.each(&Task.await/1)

Make sure you do not need more than 5 seconds to send each individual email though ;D

You can also increase the timeout by using Task.await/2 instead:

email_addresses
|> Enum.map(fn email ->
  Task.async(fn -> do_async_task(csv_path, email))
end)
|> Enum.each(&Task.await(&1, time_out))

Here you can see an async sleep in “action”:

iex(2)> :timer.tc(fn ->
...(2)>   Enum.map(1..10, fn t ->
...(2)>     Task.async(fn ->
...(2)>       Process.sleep(t * 1000)
...(2)>     end)
...(2)>   end) |> Enum.each(&Task.await(&1, :infinity))
...(2)> end)
{10002421, :ok}
1 Like

Thank you @NobbZ I got the gist of how Task.async and await is working here.

I am just starting on elixir and so I have a request how can I achieve below task.

When the request comes to

call(csv_path, email)

It returns %{response: %{message: "Request Initiated"}}

And carry on executing the below function

Can I do that?

Also I wan’t to spawn new worker of each request for this route.

Thanks.

If you want to execute the task in the background as a side effect, without waiting for the result, you can use either Task.start_link/{1,3} or Task.Supervisor.start_child/{3,5}.

Another common way to perform computation asynchronously in the background is to use a background job library like Oban.

3 Likes

Hi @NobbZ, @lucaong

I have used following approach

Controller

def create(conn, %{"file" => %Plug.Upload{path: fullpath, content_type: "text/csv"}, "email" => email}) do
    conn
      |> render_json_response({:ok, %{message: "Request Initiated"}})
    CSVParser.call(fullpath, email)
  end

CSVParser.ex

def call(csv_path, email) do
    task = Task.async(fn -> do_async_task(csv_path, email) end)
    Task.await(task, :infinity)
  end

  def do_async_task(csv_path, email) do
    csv_path
      |> read_csv
      |> dump_to_csv
      |> add_attachment
      |> Email.create(email)
      |> Email.send_email
  end

But If i hit the api simultaneously the other request is not returning Request Initiated they just keep waiting.

Also I am getting this error message

expected action/2 to return a Plug.Conn, all plugs must receive a connection (conn) and return a connection.

Yes, that’s because CSVParser.call (and therefore Controller.create) still blocks by awaiting the task. As I wrote above, if you don’t care about the result, you can use Task.start_link/1. The controller in your case does not need to wait for the email to be sent, so in CSVParser you can do this:

def call
  Task.start_link(fn -> do_async_task(csv_path, email) end)
end

Also, controller functions should always return conn (as the error message says). So make sure you return conn in your create method:

def create(conn, %{"file" => %Plug.Upload{path: fullpath, content_type: "text/csv"}, "email" => email}) do
  CSVParser.call(fullpath, email)
  conn |> render_json_response({:ok, %{message: "Request Initiated"}})
end
1 Like

Thank you @lucaong this is what I was looking for as of now.
I may have to deal with the result later on but this is perfect for now.

Also in your spare time do let me know how can we go about this by using Task.Supervisor because it gave me no process alive whenever I tried it.

And Thank you @NobbZ for all the insights can’t mark both as answers but your answer really helped me understand the stuff I was trying to do.

1 Like

You need to pass an actually running supervisor into task.supervisor.start…

You probably put a symbol in there that wasn’t a supervisor. Usually you start your supervisors in your Application module, then it’s available at runtime and you can use it in your code.