Correct way to structure PubSub broadcast data from a library

Hey all!

I’m building a library which will be listening to UDP events broadcast on the network, doing some parsing on them, and emitting the parsed events as Phoenix.PubSub broadcasts. The underlying UDP data has a number of different event types. My goal is to make it easy to handle all events emitted, or a specific type.
My question is about what the correct way to structure the pubsub broadcast to accommodate this is.

The typical PubSub structure is

PubSub.broadcast(:my_pubsub, "user:123", {:user_update, %{id: 123, name: "Shane"}})

My first pass was to do something like

PubSub.broadcast(:my_pubsub, "libraryname:udp", {:event_type_name, %{id: 123, data: "goes here"}})

But the problem is that when I subscribe to the “libraryname:udp” channel, I then need to pattern match for every single {:event_type_name, event} that might be broadcast, and have no way to easily pattern match for any event. (No way to differentiate an {:event_type_name, _} broadcast by my library from an {:event_type_name, _} broadcast by something else, unless I inject the topic name into the payload and match against that too, which seems inelegant.)

So my next attempt was to mimic the %Phoenix.Socket.Broadcast{} struct, and do this:

obj = %{topic: "libraryname:udp", event: :event_type_name, payload: %{id: 123, data: "goes here"}
PubSub.broadcast(:my_pubsub, "libraryname:udp", obj)

which allows users of the library to easily pattern match for all messages with:

def handle_info(%{topic: "libraryname:udp", event: event_type, payload: payload}, socket) do
    Logger.debug("Got event type #{event_type}, with data: #{payload}")
    {:noreply, socket}
end

or further match for specific event types by matching against the event type as well as the topic.

Now for my case, as a user of the library I’m writing, this is preferable! If I weren’t publishing the library I’d just go with it and not care. However I’m curious if this would be considered poor form for a library.

If the only thing that matters is documenting your output, then great, but I have the feeling I might be violating a norm by emitting events over a Phoenix.PubSub in an unexpected format, and I’d prefer not to have to re-write the library later with a breaking change just to do it the Right Way™. :slight_smile:

For a library I think that the best would not to publish to phoenix pubsub at all.

I guess your UDP listener is running is a process, for instance a GenServer using gen_udp. In that case, your start_link/child_spec function should accept an option called event_handler or something like that, and the user can give their own implementation.

There are different ways to do that. The simplest is when a user will give a fun, and you just call the fun with an event like %{event: :some_event, payload: %{id: 123, data: "goes here"}}. Then the user can filter the events if needed, and dispatch to their own pubsub or do whatever they want with it.

You can structure your event data as you want because there is no more norm to violate in this case.

Another possible way is a callback module. You define a behaviour and the user implements that behaviour with their own module and give the module as the option value. So you can define different callbacks like handle_event, handle_closed. This allows the behaviour to define an ìnit function so the user can return an initial state that you would pass to further callbacks and update when the callbacks also return a state (like in a GenServer handle_call when you return a new state). This may not be needed at all. Also a fun/2 can do the job, when one argument is the callback type.

Finally, depending on your architecture, the user-defined handler can be a process. Your option can accept a pid but also an atom or a via-tuple so if the user process crashes and is restarted with a new pid but the same name, you can reach it. With a process, you just send your events as messages to that user process.

I’d start with a fun though. It’s simple and goes a long way.

2 Likes

That’s an excellent point, and I had definitely looked at using a behavior, but a simple callback is even easier.

For what it’s worth I’d gone with a PubSub method for several reasons:

  1. Every user of the library is going to want to have a “get the last data received” capability. Rather than have re-implement that every time having the genserver have a latest_data method made more sense. So I’m already bundling a bit of extra functionality in.
  2. Every use-case I can think of for the data basically involves putting the data out over pubsub, and since I’m using the library in 4 different apps I really didn’t want to copy the pubsub logic to all of them…

When I thought about going with the callback/behavior I ran into the roadblock of “what if multiple processes need to consume the data”, which basically led me to re-implementing PubSub, which led me to just using Phoenix.PubSub.

But perhaps you’re right, if it’s going to be a generic library than a simple callback function is best, and copy-pasting the pubsub and caching code to every app using the library isn’t really a bad tradeoff.

Still a bit curious about the pubsub question though. The structure of pubsub messages sort of implies that any process is never really intended to be subscribing to multiple pubsub topics, since topics can’t be matched against. It’s a problem I haven’t run into before, and I’m wondering if I’m just thinking about it wrong. :slight_smile:

what if multiple processes need to consume the data

What if one consumer is a java app athat needs the even in kafka?

And what if you do not want a bottleneck gen server for the latest data but an ETS table ?

You can never cover all the cases I guess, so the best is to not close doors.

If you really want your library to force usage of Phoenix.PubSub then maybe accepting an option for the topic is ok. Or a topic prefix maybe. And then a tuple {YourLibrary, :actual_name} as the event name could work, so if the user provides a topic that other publishers publish to, there is still a way to match your library’s messages.

Edit; but I would just do both. Accept a callback where the default implementation (if no option given) is a broadcast to Phoenix.PubSub. And pass all the options given to your library when calling the callback.

1 Like

Yeah, after thinking about it a bit you make a great point. Locking in a specific way of doing things make it a much less viable library.

Also, doing both and using the tuple event name seems like a pretty perfect solution for all use-cases, I’ll give it a go.

Thanks for the help, really appreciate the ideas!

1 Like