Using Oban on functions that return

Hello guys, I’ve been checking out Oban for implementing some processing queues for my application. Something I’ve been wondering is that enqueueing does not allow to use the return of whatever business logic you are using on the worker module. For example, when the following code is executed:

%{id: 1, in_the: "business", of_doing: "business"}
|> MyApp.Business.new()
|> Oban.insert()

The perform/2 function from the MyApp.Bussiness module will return something I need, however the insertion from the pipeline will return the Oban Structure, with some arguments and metadata that I do not need for the moment. Is there a way for enqueueing and still being able to use the return from the worker module when executing the perform/2 function ?

No.

In general what you want is not possible in asynchronous background processors, by design.

The job does not need to run immediately. it can happen in a couple of seconds or take minutes or even hours before the job actually gets run.

A common approach is to let the job write its result into the filesystem or a database. You might need to also put additional information about current state into your FS/DB.

4 Likes

@tovarchristian21 if you can block until the stuff is done, can you just ditch oban entirely and simply call whatever is inside perform/2?

3 Likes

Makes a lot of sense to me @benwilson512 , since I’m using Oban for a single queue with a limit of 1 process, basically I just want to execute whatever is inside perform/2 ,one at a time. Do you know how to achieve something like that in Elixir, to block some process until some sort of message is received, or until some other functions ends.

If your goal is to ensure that exactly one process can be executing the code inside perform/2 at a time, what about wrapping that logic in a GenServer? You’d use GenServer.call to send a request, and the caller would block until the GenServer replied with the result. In that scenario, you’re using the GenServer’s process inbox as a work queue.

2 Likes

If the caller crashes or stops, what should happen to the job?

1 Like

A GenServer is often used to serialize in the way you describe. Though the problem again with this is, in theory work can pile up because of the serialisation until the point that it does not finish within the default timeout of 5 seconds anymore, your caller will then crash.

Is it worth to wait potentially long for the result? Or what is the reason you want to serialize those perform calls? Quite often serialisation of the whole call is n ot necessary but only a small section of it, or even nothing.

Make sure that it is really necessary to serialise before trying. In many cases we have decided for erlang/elixir because we want to avoid serialisation…

2 Likes

Task.async/await?

On the surface this doesn’t seem like a good use of Oban. It’s hard to say exactly what you need without more details, as others have noted in this discussion.

  • If the goal is to do work within the same node in a blocking manner then you can just call a function directly and wait for the result.
  • If the work is stateful or particularly expensive then you need to introduce a bottleneck or rate limit it by putting it in a GenServer. Using GenServer.call as @al2o3cr suggested will ensure that only one request is handled at a time.
  • If the goal is to do work across many nodes with a global rate limit then Oban with a queue limit of 1 may work for you.

This all depends on what environment you’re running in (are nodes connected or are you in something like Heroku?), what type of data you are returning (a giant binary would be horrible to send via pubsub), the number of nodes you’re running (if you have exactly one node then you don’t need global synchronization), etc.

If you can state the problem you’re trying to solve, rather than the solution you currently have, then we can help more effectively.

2 Likes

I have similar cases like this

  • Use async job queue for scaling out worker (may be in different node)
  • Make the async job transparent to the caller
    • Making the caller a process and letting it receive a message is not an option

I’m thinking of having a genserver process, which does followings when start: (e.g. using continue)

  • Place async job and keep the job ref
  • Poll or subscribe job status change

And run the genserver under dynamic supervisor and call GenServer.call on it… so that caller sees it as blocking call.

Questions

  • Any feedback on the plan?
  • @sorentwo Is there a way to subscribe job status changes via oban? I’ve done this with postgrex notification - do you think it would be good addition to oban? If so I’ll make an issue on github
1 Like

There are internal mechanisms for doing this, but nothing public and no blessed way to wait for a job to complete. This has come up a lot recently and I think there is a case for a built-in system await jobs. Please do open an issue :+1:

2 Likes

Can you elaborate on the concrete use case? I guess I just don’t see the point of something like this yet. Other languages reach for jobs in situations like this to avoid blocking, but blocking isn’t a danger in Elixir. What scenario needs Oban to execute a job, while blocking the caller of the job, which couldn’t be done by just doing the work synchronously in the caller process without Oban at all?

1 Like

Serve synchronous external api (e.g. http api) for jobs performed outside of the app asynchronously such as via message queue (Kafka) or background job (oban).

Of course, I know it’s better to have external api capable of async operation (like returning 202 and provide another api to check the status… or websocket / graphql for push) - but that’s not always an option in the real world.

1 Like

I have a few concrete real-world examples:

  1. The system runs on heterogeneous nodes where some only process media and do exports, some handle web requests, that kind of thing. Clients are connected to channels on the web node and need to be notified when work is finished on a worker node.
  2. Jobs in a workflow depend on work done in upstream jobs, like stages of processing. When a downstream job runs it waits for an upstream job to finish.

In both cases there are workarounds available; namely polling and using some outside pubsub mechanism. I’d much prefer to write something like this in my application:

%{some: "args"}
|> MyApp.Worker.new()
|> Oban.insert!()
|> Oban.await(15_000)

Where await/2 would return the value in a success tuple when it finishes processing.

@chulkilee @sorentwo Those are completely sensible use cases, but I suppose the bit I was questioning was whether a first class bespoke solution is better in the long run vs finding a way to utilize existing community tools. Conceptually, the client isn’t waiting on a job to finish, the client is waiting on a specific domain related thing to happen. By publishing, you gain complete control over the value sent out, you can even send out multiple events within the same job as various sub parts complete. All of these are completely orthogonal to the fact that the domain related activity is occurring within a job. Coupling those two feels problematic to me.

I also have library scope concerns. Obviously it’s @sorentwo’s library and he can do with it he wants, but I worry about scope creep when I see the library looking to duplicate work already found within popular and well tested libraries within the community. I just know that the moment you deliver this feature, folks will want to be able to easily add job identities, and then await on the status of those jobs on reconnect to so that you could populate eg a lists of image resize operations that are in progress. That kind of information though is extremely domain specific, you don’t want to push those attributes into the job row.

Perhaps there is a middle ground here. Maybe in lieu of building a built in pubsub, Oban could provide some useful hooks or easy to follow guide on broadcasting stuff from within a job to interested subscribers? If the concern is that people don’t want to add Erlang distribution, maybe the missing piece is a postgres based Phoenix PubSub implementation? I guess I’m pushing for a more modular solution here.

3 Likes

Thank you for the well reasoned and insightful response. You bring up some great points and I’m inclined to agree, I have similar concerns.

This is entirely true. While some type of await is possible you couldn’t do yield because the value isn’t stored anywhere—and it absolutely shouldn’t be.

This seems like the correct approach. There is already a lot of pubsub/notification handling in the library but it is private and lacks a few conveniences. Making that public with the addition of a few convenience functions would make all of these async workflow use-cases possible.

With a minor code change and some documentation this would be easy to do within your own app:

defmodule MyApp.ObanUtils do
  import Oban.Notifier, only: [signal: 0]

  alias Oban.Notifier

  def insert_await(changeset, timeout \\ 5_000) do
    Notifier.listen([:signal])

    job = Oban.insert!(changeset)

    receive do
      {:notification, signal(), %{"action" => "complete", "job_id" => ^job.id} = payload} ->
        {:ok, payload["result"]}
    after
      timeout ->
        {:error, :timeout, job}
    end
  end

  def notify(job, result) do
    Notifier.notify(signal(), %{action: :complete, job_id: job.id, result: result})
  end
end

defmodule MyApp.Worker do
  use Oban.Worker

  alias MyApp.ObanUtils

  @impl Oban.Worker
  def perform(_args, job) do
    result = do_some_work()

    ObanUtils.notify(job, result)

    :ok
  end
end

You’re then explicitly controlling when and how you broadcast and you’re aware of the values that are being returned.

3 Likes

I agree with @benwilson512 and @sorentwo in that oban’s responsibility should be well defined.

For interfaces - I’ve seen different patterns - and what’s your thoughts? Do we have established convention in elixir libraries?

  • app code calls a oban function with passing callback function (like some erlang funcs)
  • app defines pre/post hook (like Plug / Tesla style) - which I don’t think exists
  • app uses basic per-process message (as shown in the example - using Notifier.list/1 and receive
  • app subscribes telemetry: probably it doesn’t work if a job is performed by another node
  • app runs an additional supervisor provided by oban, with passing custom module with handle_event (Postgrex.Notifications)

Perhaps there is a middle ground here. Maybe in lieu of building a built in pubsub, Oban could provide some useful hooks or easy to follow guide on broadcasting stuff from within a job to interested subscribers? If the concern is that people don’t want to add Erlang distribution, maybe the missing piece is a postgres based Phoenix PubSub implementation? I guess I’m pushing for a more modular solution here.

Some notes

  • Unless it’s required, it shouldn’t require erlang cluster, since I don’t think we need global knowledge here - we just need a way to a process to subscribe specific messages
  • How to subscribe events - I’d say start with basic ones (e.g. job id, job status, …) - but I’m wondering others experience on content based event routing (e.g. process A subscribes any Kafka messages having record_id=A) - which my team had implemented. I think this could be another general design discussion…
1 Like

Sorry for the late response guys. The posted code is very similar to the approach I’ve used. The job that was inserted into Oban, was a function that “prepares” a gen_stage for generating some data. This function’s execution was very fast, since it only started the necessary GenServers of all the data feed, for that reason I had to incorporate some sort of blocking mechanism, so that only 1 job would be executed (with a limit of 1 for the queue). The reason for having a single job on the queue was basically because the app can only handle some heavy operations that consume a good amount of resources. Something as basic as a receive with a timeout was all I needed. Perhaps Oban looks like way to much to limit the amount of some word execution just to 1, but there are a bunch of benefits and value that I receive form this library that it is definitely worth having it on the codebase (for example the status checking that restarts jobs that have not been finished yet). Thank you very much to all, for sharing your opinion and been so helpful.

1 Like

BTW, is there some sort of differences when using Oban on something like Heroku? I’m feeling like the behavior of the application is different when deployed compared to my local machine. Do dynos work as nodes for Oban? the amount of them matter ?

There aren’t any differences. A dyno is an instance of your app, otherwise known as a node. You can scale them up and down as you wish, Oban will distribute jobs between the nodes.