What's the best pattern for a pubsub event listener? GenServer seems like the wrong choice

Let’s say I have a basic slack-like chat app. User’s can post messages, like messages, reply to messages, etc. It’s build using LiveView, so when I message is created/liked I publish a event to pubsub so the live_view can handle the UI updates. That’s all fine and working, but let’s say there are side effects, like send the user a notifcation of the liked message, or updating the user’s point score for a liked message. Or emailing the user if a message is replied to. Etc.

One solution is something like this:

defmodule Notification.EventHandler do
   def start_link(), do: ...

  def init(_) do
    Phoenix.PubSub.subscribe(MyApp.PubSub, "notification_event")
    {:ok, nil}
  end

  def handle_info({:liked_message, msg}, state) do
    create_notification_here(msg)
    {:noreply, state}
  end
end

This seems ok, because at least it’s async to the user liking the message, but now all “notification_events” (let’s say those are :liked_message and :message_reply) are handled sequentially.

One way to slightly improve is to change the handle_info to:

  def handle_info({:liked_message, msg}, state) do
    Task.start(__MODULE__, :create_notification_here, [msg])
    {:noreply, state}
  end
  
  defp create_notification(msg), do: some_stuff_here()

Now at least the time it takes to create the notification is blocking the GenServer, but each event is still being handle sequentially.

Is there a better way to concurrently handle events like this?

I do have something similar… but I separate Listener from Event handler.

defmodule VideoStore.Core.Listener do
  use GenServer
  ...
  @impl GenServer
  def handle_info(command, _state) do
    EventHandlers.handle(command)
    {:noreply, nil}
  end
  ...
end

I also have more than one message type, and more than one pubsub. Not only “notification_event”, but like identity event, emailer event etc. And I have more than one Listener… I put a Listener in each bounded contexts.

For example, I have one Emailer context, that react to message identity-command register_user. It sends email when a new user register, on a given channel.

It’s async and decoupled.

This seems ok, because at least it’s async to the user liking the message, but now all “notification_events” (let’s say those are :liked_message and :message_reply) are handled sequentially.

This is a fact, but whether it’s a problem or not depends on your specific use case.

For instance, it provides a free guarantee that notifications are handled in precisely the order transmitted.

  def handle_info({:liked_message, msg}, state) do
    Task.start(__MODULE__, :create_notification_here, [msg])
    {:noreply, state}
  end

Be careful with patterns like this - if a LOT of messages come in at once, there’s no limit on how many processes this tries to start. Consider using something like :poolboy to get parallel workers with bounded resource consumption.

1 Like