Graceful fallbacks using Absinthe async helper

We’ve been using Absinthe to power a mobile BFF in a kubernetes environment for several years now. One of the challenging aspects of this has been dealing with failures in a graceful way. Especially, in an environment where the apps we have dependencies on (including our rails monolith) are deployed dozens of times every day. The other challenging aspect has been ensuring that we can fetch as much data asynchronously as possible.

We originally used Absinthe’s async middleware, but soon discovered that it didn’t fail in a “nice” enough way (for our liking) when timeouts occurred. So we started creating things like this:

  def get_cart(args, %{context: %{forwarded_headers: headers}}) do
    span_context = %Spandex.SpanContext{
      trace_id: Tracer.current_trace_id(),
      parent_id: Tracer.current_span_id()
    }

    [&fetch_cart/3, &fetch_coupon/3, &fetch_messages/3]
    |> Task.async_stream(& &1.(args, headers, span_context),
      ordered: true,
      on_timeout: :kill_task,
      timeout: @cart_timeout,
      max_concurrency: 3
    )
    |> Enum.into([], fn {status, res} ->
      case status do
        :ok ->
          res

        _ ->
          nil
      end
    end)
    |> merge_responses()
  end

When I look at this, I think: “there must be a better way”.

I’m working on a PR to change the Absinthe async helper to use Task.async_stream/3 instead of Task.await and provide a fallback option. But I’m not sure how to write a “good” test for the timeout aspect, and I’m not sure there isn’t a much better option than what I’m proposing.

Is there a better way?

1 Like

One of the complexities I found with running an Elixir app in K8s is Task.async_stream/5 uses System.schedulers_online/0 to determine how many operations to run asynchronously. This returns the number of cores assigned to the pod (which in our environment is 1), which means nothing runs async when using the default options.

That’s why we explicitly set max_concurrency to the number of tasks above. To handle this, I’m going to set max_concurrency to the number of fields we want to resolve asynchronously. This means this will work quite differently to the current async helper, so I’m going to create a new AsyncStream helper instead of changing the existing Async helper.

1 Like

IMO that’s completely fine. I’ve used Task.async_stream with concurrency from 30 to 50 many times, especially when I know the tasks will be disk or network bound.

The default is just that: a default that works well in most scenarios. You and me (likely a lot of others too) simply stumbled upon the situations where the default isn’t the best for our work.

I had started working on using Task.async_stream, by accumulating a list of fields to be resolved async.
But given the feedback in Allow async timeouts to be rescued and an error inserted for resolution · Issue #1117 · absinthe-graphql/absinthe · GitHub, I’ll switch approach to using Task.yield and Task.shutdown.

The default worked fine for us, for quite some time.
As we extracted more services from our monolith, timeout and bad gateway responses became common, which is why the default no longer suits us.

So the fundamental issue here isn’t so much that Task.async_stream is horrible for this case since you do in fact have 3 things to do, but mostly I just want to point out that the way this works is fundamentally different than async middleware / resolvers.

What you have here is not an async resolver. This is a synchronous resolver that, internally, spawns and then awaits upon 3 tasks in a blocking way. No other part of your GraphQL query is being processed while this resolver runs. By contrast, asynchronous resolvers start a task and then suspend the middleware, allowing Absinthe to move forward and initiate the resolution of other fields.

3 Likes

Thanks, that’s some great feedback @benwilson512.

This hasn’t worked too badly because >99% of the time taken in our resolvers is making requests to other services. But it sounds like using the same approach as the Absinthe async middleware, but with Task.yield and Task.shutdown instead of Task.await is the approach we should’ve taken.

I’m just wondering: the async middleware docs say it’s an example of building middleware. Is it something you use in production yourself? If so, how do you handle the timeouts gracefully?

I have another question. This code looks like an ideal use-case for Task.async_stream, and it isn’t hooked in via middleware.

  defp do_batching(input) do
    input
    |> Enum.group_by(&elem(&1, 0), &elem(&1, 1))
    |> Enum.map(fn {{batch_fun, batch_opts}, batch_data} ->
      system_time = System.system_time()
      start_time_mono = System.monotonic_time()

      task =
        Task.async(fn ->
          {batch_fun, call_batch_fun(batch_fun, batch_data)}
        end)

      metadata = emit_start_event(system_time, batch_fun, batch_opts, batch_data)

      {batch_opts, task, start_time_mono, metadata}
    end)
    |> Map.new(fn {batch_opts, task, start_time_mono, metadata} ->
      timeout = Keyword.get(batch_opts, :timeout, 5_000)
      result = Task.await(task, timeout)

      duration = System.monotonic_time() - start_time_mono
      emit_stop_event(duration, metadata, result)

      result
    end)
  end

Is there another reason why Task.async_stream isn’t used here?

@Catharz there is a very easy reason why Task.async_stream isn’t used here: I don’t think it was invented yet when I wrote this code

If we were to refactor this code to use async_stream, we need to be careful not to change the semantics of how the telemetry events relate to each task. Right now the start events are emitted just prior to each task being spawned. The stop events are actually coded in a way that I think is bad, in that emitting the stop events for tasks that finish early will be delayed if they are in a later group behind tasks that finish later.

I do believe we could get the same start semantics using Stream.map so that the telemetry events were evaluated before each item. The ending semantics could be improved by doing ordered: false to the async_stream and doing a Stream.map afterward. PR welcome! :smiley:

3 Likes