Phoenix Pub/Sub is great, but I often encounter usecases where I want topic-based message passing, but durable. In other words, if the message is sent, I want to guarantee that it will eventually hit all of its configured listeners.
Oban is very popular, but the “job” abstraction feels too heavy for me. RabbitMQ+Broadway is a great combo, but it adds complexity to the deployment.
So for one project I decided to implement a reasonably flexible message queue system using only PostgreSQL. I think I’ll want to use this again, so I’ve distilled out the relevant code into a reference repo:
Usage looks a bit like this:
iex> Messaging.MessageQueueProcessor.start_link(queue: "my_queue")
iex> [%Message{type: "Example.Event", schema_version: 1, payload: %{"one" => 1}}]
...> |> Messaging.broadcast_messages!(to_queue: "my_queue")
config :postgresql_message_queue, PostgresqlMessageQueue.Messaging,
broadcast_listeners: [
{MyContext.MyMessageHandler, ["MyContext.Commands.*", "AnotherContext.Events.*"]},
{MyLogger.EventLogger, ["*.Events.*"]}
]
@impl Messaging.MessageHandler
def handle_message(%Messaging.Message{
type: "ExampleUsage.Events.Greeting",
payload: %{"greeting" => greeting}
}) do
Logger.info("ExampleUsage: received greeting: #{greeting}")
end
I’d welcome any feedback, and I hope it’s useful to someone else, or at least interesting. I’m still unsure if this would work well as a library, but I’m thinking about it.