Loading resources concurrently

One common pattern I encounter in the codebases that our team is maintaining is loading several resources concurrently. Often these resources depend on each other in a way that you need to load resource A before you can load resource B.

Here is a toy example where one would load information about a user, an organization, a project, and a list of permissions. To write this, one could structure the code like in the following example:

with(
  load_org_task <- Task.async(fn -> load_org(org_id) end),
  load_user_task <- Task.async(fn -> load_user(user_id) end),
  load_project_task <- Task.async(fn -> load_project(user_id) end),
  {:ok, org} <- Task.await(load_org_task),
  {:ok, user} <- Task.await(load_user_task),
  {:ok, project} <- Task.await(load_project_task),
  load_artifact_list_task <- Task.async(fn -> list_arifacts(org_id) end),
  load_permissions_task <- Task.async(fn -> load_permissions(project) end),
  {:ok, artifact_list} <- Task.await(load_artifact_list_task),
  {:ok, permissions} <- Task.await(load_permissions)
) do
  render(org, user, project, artifact_list, permissions)
else
  {:error, :not_found} -> ...
end

While the above example works, I feel it is not elegant enough. If you add timeouts, logs, metrics, and error handling to the above, the code can become long and, if you are not careful, a bit confusing.

I’m investigating approaches to how we could make this pattern streamlined and a bit more elegant.
I have some ideas, for example, the one in this PR: [draft] Loader by shiroyasha · Pull Request #23 · renderedtext/elixir-util · GitHub, but I’m also curious how other teams approach this or if the idea that I presented in the PR has merit and solves a real problem.

Given the above pattern, what would be your approach to cleaning it up and making it tighter and nicer?

This looks much cleaner for me:

[org, user, artifacts, {project, permissions}] =
  [
    fn -> load_org(org_id) end,
    fn -> load_user(user_id) end,
    fn -> list_arifacts(org_id) end,
    fn ->
      project = load_project(user_id)
      {project, load_permissions(project)}
    end
  ]
  |> Enum.map(&Task.async/1)
  |> Task.await_many()

if Enum.all([org, user, artifacts, project, permissions], &(elem(&1, 0) == :ok)) do
  render(org, user, project, artifacts, permissions)
else
  ...
end

If only one tasks (load_permissions) requires other task (load_project) then we do not loss anything if we merge them into one.

With above we can store all functions in list and even map them to get a list of tasks. Once that’s done all we need to do is to await for all tasks in list and we can do it in just one function call.

I have added a simple one line check for all list elements if first element of tuple is :ok, but you can use pattern-matching in with as well.

As far as I know Task.await_many/1 is concurrent, so it’s much better than few Task.await/1 calls separately.

Hey @shiroyasha,

Checkout:

Here are some arguments to use it:

It’s like Ecto.Multi but across business logic and third-party APIs.

Features

  • Transaction retries;
  • Asynchronous transactions with timeout;
  • Retries with exponential backoff and jitter;
  • Ease to write circuit breakers;
  • Code that is clean and easy to test;
  • Low cost of integration in existing code base and low performance overhead;
  • Ability to not lock the database with long running transactions;
  • Extensibility - write your own handler for critical errors or metric collector to measure how much time each step took.

Goals of the Sage project:

  • Become a de facto tool to run distributed transactions in the Elixir world;
  • Stay simple to use and small to maintain: less code - less bugs;
  • Educate people how to run distributed transaction pragmatically.

Problematic code:

defmodule WithExample do
  def create_and_subscribe_user(attrs) do
    Repo.transaction(fn ->
      with {:ok, user} <- create_user(attrs),
           {:ok, plans} <- fetch_subscription_plans(attrs),
           {:ok, charge} <- charge_card(user, subscription),
           {:ok, subscription} <- create_subscription(user, plan, attrs),
           {:ok, _delivery} <- schedule_delivery(user, subscription, attrs),
           {:ok, _receipt} <- send_email_receipt(user, subscription, attrs),
           {:ok, user} <- update_user(user, %{subscription: subscription}) do
        acknowledge_job(opts)
      else
        {:error, {:charge_failed, _reason}} ->
          # First problem: charge is not available here
          :ok = refund(charge)
          reject_job(opts)

        {:error, {:create_subscription, _reason}} ->
          # Second problem: growing list of compensations
          :ok = refund(charge)
          :ok = delete_subscription(subscription)
          reject_job(opts)

        # Third problem: how to decide when we should be sending another email or
        # at which stage we've failed?

        other ->
          # Will rollback transaction on all other errors
          :ok = ensure_deleted(fn -> refund(charge) end)
          :ok = ensure_deleted(fn -> delete_subscription(subscription) end)
          :ok = ensure_deleted(fn -> delete_delivery_from_schedule(delivery) end)
          reject_job(opts)

          other
      end
    end)
  end

  defp ensure_deleted(cb) do
    case cb.() do
      :ok -> :ok
      {:error, :not_found} -> :ok
    end
  end
end

Sage’s Solution

defmodule SageExample do
  import Sage
  require Logger

  @spec create_and_subscribe_user(attrs :: map()) :: {:ok, last_effect :: any(), all_effects :: map()} | {:error, reason :: any()}
  def create_and_subscribe_user(attrs) do
    new()
    |> run(:user, &create_user/2)
    |> run(:plans, &fetch_subscription_plans/2, &subscription_plans_circuit_breaker/3)
    |> run(:subscription, &create_subscription/2, &delete_subscription/3)
    |> run_async(:delivery, &schedule_delivery/2, &delete_delivery_from_schedule/3)
    |> run_async(:receipt, &send_email_receipt/2, &send_excuse_for_email_receipt/3)
    |> run(:update_user, &set_plan_for_a_user/2)
    |> finally(&acknowledge_job/2)
    |> transaction(SageExample.Repo, attrs)
  end

  # Transaction behaviour:
  # @callback transaction(attrs :: map()) :: {:ok, last_effect :: any(), all_effects :: map()} | {:error, reason :: any()}

  # Compensation behaviour:
  # @callback compensation(
  #             effect_to_compensate :: any(),
  #             effects_so_far :: map(),
  #             attrs :: any()
  #           ) :: :ok | :abort | {:retry, retry_opts :: Sage.retry_opts()} | {:continue, any()}

  def create_user(_effects_so_far, %{"user" => user_attrs}) do
    %SageExample.User{}
    |> SageExample.User.changeset(user_attrs)
    |> SageExample.Repo.insert()
  end

  def fetch_subscription_plans(_effects_so_far, _attrs) do
    {:ok, _plans} = SageExample.Billing.APIClient.list_plans()
  end

  # If we failed to fetch plans, let's continue with cached ones
  def subscription_plans_circuit_breaker(_effect_to_compensate, _effects_so_far, _attrs) do
    {:continue, [%{"id" => "free", "total" => 0}, %{"id" => "standard", "total" => 4.99}]}
  end

  def create_subscription(%{user: user}, %{"subscription" => subscription}) do
    {:ok, subscription} = SageExample.Billing.APIClient.subscribe_user(user, subscription["plan"])
  end

  def delete_subscription(_effect_to_compensate, %{user: user}, _attrs) do
    :ok = SageExample.Billing.APIClient.delete_all_subscriptions_for_user(user)
    # We want to apply forward compensation from :subscription stage for 5 times
    {:retry, retry_limit: 5, base_backoff: 10, max_backoff: 30_000, enable_jitter: true}
  end

  # .. other transaction and compensation callbacks

  def acknowledge_job(:ok, attrs) do
    Logger.info("Successfully created user #{attrs["user"]["email"]}")
  end

  def acknowledge_job(_error, attrs) do
    Logger.warn("Failed to create user #{attrs["user"]["email"]}")
  end
end

Result

Along with a readable code, you are getting:

  • Reasonable guarantees that all transaction steps are completed or all failed steps are compensated;
  • Code which is much simpler and easier to test a code;
  • Retries, circuit breaking and asynchronous requests out of the box;
  • Declarative way to define your transactions and run them.

P.S. I am not super experienced with Elixir, but this library seemed useful so I kept it in my bookmarks.

4 Likes

@Eiji @derpycoder, thank you guys for the response. Both options are a good improvement over the original code snippet.


For @Eiji solution, the things I’m not sure how to approach are
1 —deeper nested dag dependencies. For example:

a --> b
   --> c  --> d
          --> e --> f

2 — error handling, or to know at the end of the operation which resource failed to load and what the reason was.


For @derpycoder, nice example! I feel it is a bit more suited for resource creation than resource loading, but the patterns in the code can be reused and streamlined for loading.

Honestly it looks overcomplicated for me and firstly I would try to refactor app to not fetch “half of database”. Not sure what you are trying to render, but in most cases you could just simplify your templates.

If you really need to load nested data concurrently you can write something like:

defmodule Example do
  def sample(list) do
    {children, list} = organise_nested(list)
    list = [children | organise_flat(list)]
    Enum.reduce(list, %{}, &load_many/2)
  end

  defp load_many(list, acc) do
    list
    |> Enum.map(&Task.async(fn -> {&1, load(&1, acc)} end))
    |> Task.await_many()
    |> Map.new()
    |> Map.merge(acc)
  end

  defp load(:project, %{permissions: _permissions}) do
    # …
  end

  defp load(:org, _acc) do
    # …
  end

  defp load(:user, _acc) do
    # …
  end

  defp load(:artifacts, _acc) do
    # …
  end

  defp organise_flat(list) do
    {left, right} = Enum.split_with(list, &is_atom/1)

    if right == [] do
      [left]
    else
      right =
        Enum.map(right, fn {parent, children} ->
          children = Enum.reject(children, &(&1 in left))
          if children == [], do: parent, else: {parent, children}
        end)

      [left | organise_flat(right)]
    end
  end

  defp organise_nested(list) do
    {list, children} = organise_nested(list, [])
    list = Enum.reverse(list)
    children = children |> Enum.reverse() |> Enum.uniq()
    {children, list}
  end

  defp organise_nested([], list), do: {[], list}

  defp organise_nested([head | tail], list) when is_atom(head) do
    result = organise_nested(tail, [head | list])
    if is_list(result), do: {[], result}, else: result
  end

  defp organise_nested([{parent, children} | tail], list) do
    children_list =
      children
      |> Enum.reduce([], fn
        {child, _child_children}, acc -> [child | acc]
        _child, acc -> acc
      end)
      |> Enum.reverse()

    parent_result = if children_list == [], do: parent, else: {parent, children_list}
    {nested_result, list} = organise_nested(children, list)
    {tail_result, list} = organise_nested(tail, list)
    {tail_result ++ [parent_result | nested_result], list}
  end
end

With this you would have 4 calls to Task.wait_many/1:

  1. Load b, d and f as they are leafs
  2. Load e as it requires f
  3. Load c as it requires e
  4. Load a as it requires c

The downside of this is that e would wait for f as well as b and d. However if you would rewrite this code to work on each nested level then you would have a similar problem as b would then wait for d, e and f when it does not requires anything.

If you want to even go further and fix that then you would need to replace this code:

    list = [children | organise_flat(list)]
    Enum.reduce(list, %{}, &load_many/2)

with a call to your custom function like:

# notice no organise_flat/1 call here
# children looks like: [:b, :d, :f]
# list looks like: [:e, {:c, [:e]}, {:a, [:c]}]
custom_await_many(children, list)

You would need to look at source of Task.await_many/1 and rewrite it so:

  1. First arguments children (leafs) are changed to tasks with Task.async/1 call and those would be added to awaiting
  2. When receive block handles reply and like {:project, project} then you need to call Map.put/3 for replies and add condition that if said reply was last needed then add to awaiting more things like:
{child_name, result} = reply
replies = Map.put(replies, child_name, result)

[unblocked, blocked] =
  Enum.reduce(blocked, {[], []} fn
    {name, [child_name]}, [unblocked, blocked] ->
      [[name | unblocked], blocked]

    {name, children}, [unblocked, blocked] ->
      children = Enum.reject(children, &(&1 == name))
      [unblocked, [{name, children} | blocked] 
  end)

# now change every unblocked to task using replies
# go back to `receive` block waiting for other replies

Enum.reject(children, & &1 == name)Enum.reject(children, & &1 == name)

In all cases it’s always the same. If load function would return an error all you need to do is to stop working. As posted above simple condition like:

failed_replies = Enum.reject(replies, &(elem(&1, 0) == :ok)

if failed_replies == [] do
  # …
else
  # …
end

and rest would be done by pattern-matching - it’s as simple in reduce function as well as when writing a custom await_many.

Loading things concurrent isn’t always as good as it sounds. It’s possible it makes things lot worse. Just calling them sequentially and let different request basically do concurrent request to the database might be better because it’s more fair sharing of database resource between requests. We did this in the past and had to refactor our app away from it because one request would start lot of multiple data calls to database and block short request with only few database call that arrived little bit later. So I would think hard before doing this kind of “optimization”.

2 Likes

@Eiji Thanks once again for the great breakdown. The organize_nested is definitely hitting on many things I’m looking for. I’ll try to extract the essence and generalize the snippet that you shared.

@wanton7 good reminder, and thank you for sharing your path of doing this and then refactoring it to a fully sequential approach. I hit this same concern in ~ 2020 when we introduced a lot more parallelism into our codebase to reduce the processing time. I have strong signals from the last three years (metrics & traces) that show that parallelism significantly improves overall performance.

How can this be? We are still hitting the same database, right?

It seems that two factors make this assumption incorrect.

First, most resources are heavily cached, with level 1 being the local in-memory cache, level 2 being cluster distributed cache or dedicated cache storage like Redis, and level 3 being the database. So a typical request would hit multiple independent data systems.

Secondly, not all of our requests are database bound. We are a CI/CD provider where internal and external resources like virtual machines, docker registries, blob storage, and job processing units provide their internal APIs.

Sequential vs. Fully Concurrent
These are the two extremes. Loading things one-by-one or spawning up concurrent tasks without limits. In my experience, both approaches are suboptimal. The first one doesn’t take advantage of available resources when they are readily available, while the second can introduce more harm than good, like in the example you shared.

Ideally, I would like to express the dependencies and let the system decide when it is best to go in parallel and when it is best to go one-by-one. Something like (pseudocode):

resources = [
  {:a, &load_a/1},
  {:b, &load_b/1},
  {:c, &load_c/2, deps: [:a, :b]}
]

load(resources) # the load function now knows the dependencies and should be able to make the best decision based on the system's state and the platform's saturation.

Once again, thank you all for sharing your input. It helps me a lot to clarify my thoughts and question my assumptions. I’ll make sure to share which direction my team will take and, of course, share some open-source code and examples.

2 Likes