Task won't run from controller, but runs fine from Iex

Hey folks,

This almost certainly isn’t a Phoenix issue, but it occurs when running my app within Phoenix. We’re building a document converter which I’m running over GRPC. My client-side interface takes a callback, starts a task, and streams results to the callback as they arrive. My Phoenix context calls this API and, from the callback which it passes to the task, updates rows in Postgres and pushes channel updates based on these results.

If I run my context method from an IEx session, everything works as I’d expect. My database updates, pubsub messages get pushed to my LiveViews, etc. If I run it from a web request, none of this happens.

Specifically, I don’t have a lot of robust case clause handling right now because I want to see how the thing typically fails and add handling for those instances. It seems like my GRPC connection is getting dropped in the callback, so the callback fails and none of my update logic works. I see where the failing match is and will mark it below, but I don’t immediately see why the connection is dropped to begin with.

My suspicion is that my task runs fine from the Iex session because its calling PID continues to be alive. But, when called from a web session, the context method is called from a rendering process which goes away once the page renders. Does that seem likely? If so, how should I fix this? Should I start my task in a different way, use something other than a task, or…?

Here’s my code. I haven’t made this too robust because all services are running locally and, again, if something fails then I’drather let it crash and respond to that, than build a bunch of robustness I don’t know I’ll need. I’m also new to OTP, so don’t want to add lots of complexity right now unless I absolutely need it. Here’s how I start my task supervisor:

    children = [
      # Start the Ecto repository
      Scribe.Repo,
      # Start the endpoint when the application starts
      ScribeWeb.Endpoint,
      # Clean up expired authentication data
      # Scribe.Auth.Cleanup,
      # Clean up expired documents.
      Scribe.Documents.Cleanup,
      # Start a task supervisor for running non-blocking conversions. This is the supervisor I use for my API.
      {Task.Supervisor, name: Scribe.TaskSupervisor},
    ]

    # See https://hexdocs.pm/elixir/Supervisor.html
    # for other strategies and supported options
    opts = [strategy: :one_for_one, name: Scribe.Supervisor]
    Supervisor.start_link(children, opts)

  # My method that starts the task
  def convert(base, format, preview, callback, opts) do
    format = format
    |> Atom.to_string()
    |> String.upcase()
    |> String.to_atom()
    password = opts[:password] || ""
    opt_settings = opts[:settings] || %{}
    large_print = BoolValue.new(value: opt_settings[:large_print] || false)
    add_image_descriptions = BoolValue.new(value: opt_settings[:add_image_descriptions] || true)
    settings = Converter.ConvertSettings.new(
      ttsVoice: opt_settings[:tts_voice] || "neutral",
      ttsRate: opt_settings[:tts_rate] || 1.0,
      brailleTranslationTable: opt_settings[:braille_translation_table] || "en-us-g2.ctb",
      largePrint: large_print,
      addImageDescriptions: add_image_descriptions
    )
    request = Converter.ConvertRequest.new(
      base: base,
      format: format,
      password: password,
      preview: preview,
      settings: settings
    )
    {:ok, channel} = channel()
    Task.Supervisor.start_child(Scribe.TaskSupervisor, fn ->
      {:ok, stream} = Converter.Converter.Stub.convert(channel, request, timeout: 1000000) # Here's the failing match.
      Enum.each(stream, callback)
    end)
  end

  # And the relevant snippet from my `get_or_create_output` context method called from a Phoenix GET action
            {:ok, _} = Converter.convert(base, format, preview, fn(result) ->
              IO.inspect(result)
              case result do
                {:ok, result} ->
                  {:ok, document} = doc
                  |> Document.changeset(%{
                    title: result.title,
                    page_count: result.fragment_count,
                  })
                  |> Repo.update()
                  file = if result.key == "" do
                    nil
                  else
                    Path.basename(result.key)
                  end
                  {:ok, output} = output
                  |> Output.changeset(%{
                    expected_fragment_count: result.expected_fragment_count,
                    completed_fragment_count: result.completed_fragment_count,
                    is_preview: result.preview,
                    file: file,
                  })
                  |> Repo.update()
                  ScribeWeb.Endpoint.broadcast(
                    "documents/#{doc.id}/#{format}",
                    "updated",
                    %{
                      document: document,
                      output: output,
                    }
                  )
              end
            end)

Suggestions as to how to keep this callback running even though the controller/rendering process goes away? I do want to run it independently of the controller, since multiple users may be viewing the same document, so subsequent accesses will subscribe to the channel and not trigger the conversion.

Thanks.

1 Like

Really hard to help you like that. Do you have some smaller project in GitHub that demonstrates the problem?

This does sound like some lifecycle problem with LiveView which I am not familiar yet with, though.

Thanks, got it figured out.

The issue was that I created my GRPC channel outside of the task, which
I assume in turn created a process that was killed when the calling
function (not the task) was terminated. Moving channel creation into the
task made it work with no other changes.

2 Likes