We’re trying to implement a scheduler which will poll different queue tables and process the jobs there. I’ve looked into Oban
and Rihanna
but we’d like to roll our own as there are a few differences between what we’re trying to achieve and what these libraries do. Though Oban does exactly what we want, I don’t think we want to pay for the pro version.
That said, we’ve got two tables each of which contains different job types, i.e. each table has a few different columns besides having a few other common columns as well. We’re inserting jobs into the queue and we’d like to keep updating those jobs.
In fact, we want to update their payloads because the payloads are actually payloads for an API call to a 3rd party service, hence we want to keep updating the record instead of inserting new jobs into the queue. Eventually there’ll be a limited amount of jobs in the end. Not sure whether that’s still considered a queue but anyway.
The idea is to fetch those jobs in batches, merge their payloads and make one API call because of the rate limiting we have.
In my implementation the structure is as follows:
updater/
├─ jobs/
│ ├─ job_type1.ex
│ ├─ job_type2.ex
├─ config.ex
├─ job.ex
├─ scheduler.ex
updater.ex
Updater
is a supervisor which will start Task.Supervisor
and the Scheduler
which is a GenServer
and which will be responsible for polling the DB for jobs.
Job
module is to enqueue
, fetch
(in batches as well) and process
(not sure about that one) jobs.
My question is how can I have a generic job type so that I wouldn’t have to write the same functions for each job type I have? Since I have different columns in my queue tables (job_type1
and job_type2
are basically schemas corresponding to those queue tables) if I want to enqueue a job for example, I have to write two enqueue
functions (one for job_type1
and another for job_type2
) each pattern matching a column that’s unique to that queue table. Also Repo.insert
would have to handle the conflict
as well since we want to update the job on conflict.
Does it make sense to use protocols for each job type like in the snippet below? Looking at the description below I’d say yes but on the other hand, for example, for functions like fetch
we don’t really need every job type to implement that and you can’t pass a struct to fetch
anyway. For the fetch we need to pass the name of the job so we can query that job table.
I didn’t really use protocols before and I don’t really know the practical differences between behaviours and protocols. So I wanted to consult you people.
Protocols are a mechanism to achieve polymorphism in Elixir when you want behavior to vary depending on the data type.
So what I would do is,
defprotocol Job do
def process(job)
def enqueue(job)
# Other APIs
end
defmodule JobType1 do
use Ecto.Schema
schema "job_type_1_queue" do
# fields ...
end
end
defimpl Job for: JobType1 do
def enqueue(job), do: # Insert and update the job here
end
Does that make sense or do you think I could do better with behaviours, if so how?
Cheers!
Edit: Having the enqueue
function in a Protocol doesn’t make sense either because then I will not have a struct to “enqueue” a job anyway, it’ll only be a map with params. What enqueue should do is to insert into the table in the first place.
Another one: Now I am thinking maybe implementing a behaviour in a schema makes more sense? So I could do use apply/3
on each job type?
Hopefully last one: Then maybe instead of behaviours I should just implement every job related function (like enqueue
and process
) in job.ex
for all the job types and not worry about the duplication? It wouldn’t be the exact same code but anyway.