Implementing Protocol for Schemas vs Behaviours

We’re trying to implement a scheduler which will poll different queue tables and process the jobs there. I’ve looked into Oban and Rihanna but we’d like to roll our own as there are a few differences between what we’re trying to achieve and what these libraries do. Though Oban does exactly what we want, I don’t think we want to pay for the pro version.

That said, we’ve got two tables each of which contains different job types, i.e. each table has a few different columns besides having a few other common columns as well. We’re inserting jobs into the queue and we’d like to keep updating those jobs.
In fact, we want to update their payloads because the payloads are actually payloads for an API call to a 3rd party service, hence we want to keep updating the record instead of inserting new jobs into the queue. Eventually there’ll be a limited amount of jobs in the end. Not sure whether that’s still considered a queue but anyway. :sweat_smile:

The idea is to fetch those jobs in batches, merge their payloads and make one API call because of the rate limiting we have.

In my implementation the structure is as follows:

updater/
├─ jobs/
│  ├─ job_type1.ex
│  ├─ job_type2.ex
├─ config.ex
├─ job.ex
├─ scheduler.ex
updater.ex

Updater is a supervisor which will start Task.Supervisor and the Scheduler which is a GenServer and which will be responsible for polling the DB for jobs.

Job module is to enqueue, fetch (in batches as well) and process (not sure about that one) jobs.

My question is how can I have a generic job type so that I wouldn’t have to write the same functions for each job type I have? Since I have different columns in my queue tables (job_type1 and job_type2 are basically schemas corresponding to those queue tables) if I want to enqueue a job for example, I have to write two enqueue functions (one for job_type1 and another for job_type2) each pattern matching a column that’s unique to that queue table. Also Repo.insert would have to handle the conflict as well since we want to update the job on conflict.

Does it make sense to use protocols for each job type like in the snippet below? Looking at the description below I’d say yes but on the other hand, for example, for functions like fetch we don’t really need every job type to implement that and you can’t pass a struct to fetch anyway. For the fetch we need to pass the name of the job so we can query that job table. :thinking:

I didn’t really use protocols before and I don’t really know the practical differences between behaviours and protocols. So I wanted to consult you people.

Protocols are a mechanism to achieve polymorphism in Elixir when you want behavior to vary depending on the data type.

So what I would do is,

defprotocol Job do
  def process(job)

  def enqueue(job)

  # Other APIs
end

defmodule JobType1 do
  use Ecto.Schema

  schema "job_type_1_queue" do
     # fields ...
  end
end

defimpl Job for: JobType1 do
   def enqueue(job), do:  # Insert and update the job here
end

Does that make sense or do you think I could do better with behaviours, if so how?

Cheers!

Edit: Having the enqueue function in a Protocol doesn’t make sense either because then I will not have a struct to “enqueue” a job anyway, it’ll only be a map with params. What enqueue should do is to insert into the table in the first place. :man_facepalming:

Another one: Now I am thinking maybe implementing a behaviour in a schema makes more sense? So I could do use apply/3 on each job type? :thinking:

Hopefully last one: Then maybe instead of behaviours I should just implement every job related function (like enqueue and process) in job.ex for all the job types and not worry about the duplication? It wouldn’t be the exact same code but anyway. :man_shrugging:

Are you sure?

  • We’re happily using the free version - so maybe you can do that too?
  • Have you tried to estimate how much effort/money it’s going to take you to implement the Oban features? Handling errors, making sure a job is run only once, backoff, unique jobs and much, much more.
2 Likes

Especially if you are at a company, I cannot fathom that the cost of Oban will exceed the cost of developer time to re-implement those features.

4 Likes

I am with you, but I noticed that businesses really don’t want dollar amounts attached to coding lines.

A lifetime ago, it was super hard to “sell” Sidekiq Pro (Ruby on Rails’ “official” job queue library) several teams I consulted for different clients. The mere idea that something in the code will stop working if they forget to pay an annual subscription made them adamant that we have to look for free alternatives.

No clue why that is but it’s a very persistent mindset, apparently.

Makes sense in certain setups. Still, I’d go with the free version of Oban and only rebuild the components that are paid.

1 Like

In general, I think you’d be better off with passing only the ID to the job and fetching the latest payload when the job is executing. That way, you’ll get rid of some of concurrency/timing issues.

1 Like

Well, you’re all right and thanks for the input.

Maybe we have to give Oban a second look. The thing is, we started implementing it without considering all the possible solutions, hence now what we’ve implemented doesn’t really overlap with how Oban handles things. Though I am still not sure whether we could achieve what we wanted with Oban. I’d have to check its docs further as I’ve never used it before

How do I extend it? Do I just clone it and change the things I want or can I somehow write plugins for Oban? In our case for example, we would probably not insert a job but upsert a job depending on one of the params in the args jsonb which, in our implementation, is a column so we can update on_conflict.

Never done that, so not sure. But I think that plugins is an option.

For the specific use case, I think you’d be better off passing just the ID to the job and using unique jobs which would effectively give you an upsert.

Right! I think unique jobs just does what we want and it even gives an option to update depending on a key from the args.

:keys — A specific subset of the :args to consider when comparing against historic jobs. This allows a job with multiple key/value pairs in the args to be compared using only a subset of them.

But then the question is we do not have the same keys for different queues hence I am not sure whether we can check for conflicts depeding on different keys.

And then only thing left for us would be to get the jobs in batches and process them.

Oban supports plugins (there is a guide on it) and the core engine is pluggable (there is a documented behaviour). That is how it is extensible enough to build Pro.

That said, building distributed portioned rate limiting on your own isn’t easy by any stretch of the imagination.

2 Likes

We don’t want it to be distributed yet, as far as I know that’s not possible with the current infrastructure.
I’ve been reading the source code of Oban and I’ve learnt a lot. Thanks for such a great tool!