Elixir-postgresql-message-queue - Pure PostgreSQL Message Queue for Elixir

Phoenix Pub/Sub is great, but I often encounter usecases where I want topic-based message passing, but durable. In other words, if the message is sent, I want to guarantee that it will eventually hit all of its configured listeners.

Oban is very popular, but the “job” abstraction feels too heavy for me. RabbitMQ+Broadway is a great combo, but it adds complexity to the deployment.

So for one project I decided to implement a reasonably flexible message queue system using only PostgreSQL. I think I’ll want to use this again, so I’ve distilled out the relevant code into a reference repo:

Usage looks a bit like this:

iex> Messaging.MessageQueueProcessor.start_link(queue: "my_queue")
iex> [%Message{type: "Example.Event", schema_version: 1, payload: %{"one" => 1}}]
...> |> Messaging.broadcast_messages!(to_queue: "my_queue")
config :postgresql_message_queue, PostgresqlMessageQueue.Messaging,
  broadcast_listeners: [
    {MyContext.MyMessageHandler, ["MyContext.Commands.*", "AnotherContext.Events.*"]},
    {MyLogger.EventLogger, ["*.Events.*"]}
  ]
@impl Messaging.MessageHandler
def handle_message(%Messaging.Message{
      type: "ExampleUsage.Events.Greeting",
      payload: %{"greeting" => greeting}
    }) do
  Logger.info("ExampleUsage: received greeting: #{greeting}")
end

I’d welcome any feedback, and I hope it’s useful to someone else, or at least interesting. I’m still unsure if this would work well as a library, but I’m thinking about it.

9 Likes

looks good
there also GitHub - gmtprime/yggdrasil: Subscription and publishing server for Elixir applications.
but it’s not maintained anymore

1 Like

I wasn’t aware of yggdrasil. Thanks :slight_smile: My main concern would be that it requires subscription. If a process relies on durable messaging, but it dies, what happens to the messages that enter the queue while the process is down? I would like guarantees that the messages will be delivered to the process once it’s up again. And it looks like that kind of guarantee may not be available for yggdrasil.

1 Like

What’s wrong with Oban for this usecase ?

RabbitMQ+Broadway is a great combo, but it adds complexity to the deployment

You don’t need Broadway for this, Broadway is more like a job queue.

1 Like

Hey @giddie! What guarantees does this project provide regarding message order and visibility? That is to say, one of the common “gotchas” with Postgres is assuming that things like sequences can be relied upon to act like monotonic cursors. For example:

Process A                         | Process B
BEGIN                             | BEGIN
insert next_val('my_seq')         | insert next_val('my_seq')
1                                 | 2
# do something that takes time    | COMMIT
COMMIT

In this scenario, the value 2 will be visible outside the transaction before 1, and so if a consumer treats “oh I have seen message with id 2, therefore I am caught up to 2” they will miss messages.

How does this project avoid that issue?

3 Likes

21 posts were split to a new topic: Postgres and guaranteed orderings

There’s quite a bit of boilerplate around each job type in Oban. You need to define a module for each kind of job you want to process. Additionally, I ran into frustrations with Oban due to the way jobs are locked for processing. In my opinion, it should not be necessary for the Lifeline plugin to exist. A failed node should cause a job to return to the queue quickly and automatically.

I’m not sure I follow. You’re right that Broadway isn’t strictly necessary, especially if you don’t need concurrency. It does have some interesting features, though.

Messages are ordered by primary key and deleted from the queue as they are processed. So messages should not be missed, but there are no guarantees about messages being processed in strict id order.

But assuming that messages are inserted serially, the messages are then guaranteed to be processed in the order they were inserted, so long as the processor concurrency is 1 (which is the default). In this case a processing error will also block the queue, i.e. the processor will not skip to the next message, to ensure message order is preserved.

If processor concurrency is configured > 1, then it will infer that it’s fine to reorder messages, and failed messages will not block the queue. In this case, if a backoff function is configured, it will apply to individual messages (instead of the whole queue), and instead of blocking the queue they will be re-enqueued with metadata tracking number of attempts, and a “process_after” timestamp.

QueueProcessor module documentation outlines the available configuration options.

1 Like

Broadway is best for long running CPU-bound jobs. I tried to use it for messaging case where I needed to process massive number of tiny messages and it was not a good fit. It also forces lots of opinions (the way you need to structure your message processing) that may bring issues if your system does not fit in it’s vision.
In general, it’s just not a message queue at all. A different tool for different cases.

Well, that’s kind of my point with this project – different usecases will have different requirements. So a library that tries to cover all the usecases is not necessarily a good idea. But some reference code can be really handy, so you can copy the bits you need and adapt it as needed.

Well no, it’s a data stream processing framework. So for processing a message queue, it’s pretty useful. It’s certainly not the only way to ingest a queue, though. A simpler GenServer for queue processing can be found here in my CQRS patterns repo, and it’s pretty much a drop-in replacement for the Broadway-based processor.

Ultimately, anything could call Messaging.process_message_queue_batch/2, so the world’s your oyster.

The main advantage of Broadway in this message queue example is concurrency – it decouples the ingestion from the processing, so one process can pull a batch of 100 messages, and distribute them across a pool of 5 workers. And of course if the work calls for it, Broadway has features such as batching and pre-fetching that could be applied very easily on top of this, since the hard work of building the Broadway Producer is already done,

1 Like