Guaranteeing jobs competition with Oban

Hello guys, I’m currently using Heroku workers in order to execute some Oban jobs. However, I’m noticing that despite that every job completes the task (execute some reports generations and a S3 uploading), the state of the job maintains with the “retryable” status, and it is rescheduled every single time until it runs out of attempts. On my local machine, without the workers, and the Heroku “clustering” the jobs’ status always turns to “completed”, but on my deployed implementation, the jobs stop when the attempts are the 10 I have set and the status is set to discarded… have you guys encountered a similar situation?

What is the return value from your job? If it is returning an error tuple it is considered a failure, even if the side-effect of uploading reports is successful.

You should see a list of errors in the errors field for each job (or in the detail view if you are using the UI).

I have a function that executes a gen_stage, however I block the function with a receive in order to actually limit the queue until the reports is generated and uploaded to S3 because this functions returns instantly when the gen_stage is setup. The following function is what my worker module return from the perform.

  def generate_report_async(info_report, update_report, attempt) do
    {:ok, a} = GenerateReport.start_link(info_report)
    {:ok, b} = UploadToS3.start_link()

    {:ok, c} =
        report: a,
        s3: b,
        update: update_report,
        info_report: info_report,
        attempt: attempt,
        parent: self()

    GenStage.sync_subscribe(b, to: a)
    GenStage.sync_subscribe(c, to: b)

    receive do
      message ->
      get_timeout() -> {:ok, :finished_by_timeout}

The last stage (the “c” stage), sends a tuple in the form {:ok, :report_ready} (which is the bounded message variable) to the blocked function, because one of the parameters it actually the PID of the executing job. I added a timeout to the receive in case one of the stages fails (not really sure it should have it). The timeout is allocated as a config var for this case of 15 mins (in milliseconds), however based on your suggestion I think that timeout messing with the implementation because most of the errors look like the following:

{"{\"at\": \"2020-02-25T20:21:56.365686Z\", \"error\": \"** (ErlangError) Erlang error: :timeout_value\\n    (api) lib/api/commands/persist_reports.ex:142: Api.Commands.PersistReports.generate_report_async/3\\n    (oban) lib/oban/queue/executor.ex:65: Oban.Queue.Executor.perform/2\\n    (stdlib) timer.erl:197:\\n    (oban) lib/oban/queue/executor.ex:13:\\n    (elixir) lib/task/supervised.ex:90: Task.Supervised.invoke_mfa/2\\n    (elixir) lib/task/supervised.ex:35: Task.Supervised.reply/5\\n    (stdlib) proc_lib.erl:249: :proc_lib.init_p_do_apply/3\\n\", \"attempt\": 1}"}

FWIW, passing "5000" for the timeout in after will give exactly the error you’re seeing in the logs.


I pass 1200000 milliseconds. Just in case, but the generation of a report and the upload never take longer than 4 minutes. I gotta be missing something there… :thinking:

What is the implementation of get_timeout()?

It has the current implementation:

def get_timeout(), do: Application.fetch_env!(:api, :timeout)

On my dev.exs it has:

config :api,
  timeout: 900_000

And on production:

config :api,
  timeout: Map.fetch!(System.get_env(), "REPORTS_TIMEOUT")

With a timeout of 1_200_000 as a config var on Heroku.

Map.fetch!(System.get_env(), "REPORTS_TIMEOUT") will return a string which is what @al2o3cr was hinting at. So you’ll need to convert that to a number before using it as a timeout value.

Also if you’re using Elixir 1.9 you can use System.fetch_env! instead of that Map.fetch!:!/1


Of course! that was the case, why on earth did I assumed that config vars had the variable type I was expecting… Thank you very much for pointing the obvious @sorentwo @al2o3cr @axelson