Phoenix_pubsub_buffered - An in-memory downtime-tolerant pubsub adapter

Hi folks, looking for feedback on this new Phoenix pubsub adapter. It’s meant to be an in-memory lightweight substitute to using something like kafka or postgres WAL replication where it persists messages, and a set of cursors that subscribers can use to “catch up” on messages if they lose connection or get behind.

And by “lightweight” I mean that it doesn’t have full unlimited durability. Because it’s in-memory you get a constant-size ring buffer to hold messages. If a consuming node is disconnected from a producing node for so long that the read cursor is overwritten, when the node re-establishes a connection all subscribers on the consuming node will receive a special message

{:cursor_expired, publisher_node_name}

At that point, it is the application code’s responsibility to return to a valid state, by doing something like reloading state from a source of truth like a db or other node, etc.

Design

Producers are in charge of maintaining the set of cursors. This allows producers to “catch up” previously connected nodes, while newly connected nodes can create a new cursor and only receive messages going forward.

This brings up another edge case, if there is a network partition, and during that time, the producer restarts and loses all it’s state, including cursors. When the nodes reconnect, the consuming node will not know that it missed some messages during the outage since it will be seen as a “new node” to the producer and only get future messages.

This is a condition that this library is explicitly here to prevent. We want a guarantee that if I see message 3, that means I’ve also seen 2 and 1. There should never be gaps in messages. This allows us to use this library for streaming things like event_sourced messages where none can be missing. Subscribers can be “behind” or “eventually consistent” but they shouldn’t ever contain a state that never existed on the producer. I can’t remember what this property is called… so if someone can remind me (strict serializability?)

To solve this issue, consumers also keep a record of all the producers that have previously registered with. When they reconnect with a node they are previously registered with, they let the producer know they are registered. If the producer doesn’t have a record of that, it knows something is wrong and send the {:cursor_expired, producer_node_name} message so the consumer knows it needs to reset state.

Some future iteration concerns:

  • Even though the state is stored in a constant size ring buffer, the cursors are monotonically increasing integers. This was an optimization to not have to update all cursor values periodically, but I might look into that in the future. In practice, I believe integers don’t have a “max value” in elixir, and for modern use cases of elixir, we often deploy enough that this gets reset frequently anyway.

  • Currently I also don’t “clean-up” old sessions, so if you have a long running node, and many consuming nodes come and go over time, the number of cursors would continue to grow. In the future I may add a cleanup timer to remove cursors that are expired anway.

  • Could provide the option for producers to fsync their buffer to disk for applications that want to minimize “expired” situations due to producer crashes.

Links

Also if you have a better idea for a name, I’m open to suggestions!

8 Likes

Hey @enewbury this is super cool. Can you talk a bit about a use case or scenario where this sort of semi-persistent pubsub approach is the right balance?

1 Like

Sure, our use case is basically hot-standbys of an in-memory statemachine. The source of truth is always written to the database, but reloading all state from DB is expensive and we want to “keep up” in real time with the leader node. However, if a follower gets too far behind, it’s trivial to just reload the source of truth from db and continue replicating from there. (When we actually do a failover, we have to block writes on the leader and send a flush message through so we know a follower is fully replicated). We were doing all of this previously with the postgres WAL, but we ran into a lot of headaches and limitations for our usecase.

Basically anything where a consumer of messages can’t tolerate “not knowing” that they’ve missed messages but they have the capability to “restart” from a known good state as long as they are notified that there was a problem.

3 Likes

This is super cool. Can this library be used with Phoenix Channel with unstable clients? The clients can drop the connection due to network problem, mobile, etc. However, when the client reconnect to the same Phoenix channel, it would be nice if all missed messages will be redelivered.

Hmm, maybe not. This library just handles the fault-tolerance between distributed elixir nodes. Once it gets to a node, delivery to an external client over websocket is another thing. That’s where the adapter ends, and core phoenix pubsub takes over, which will “send” the message to all the individual subscriber processes on that local node. If one of those local processes is a channel that manages a websocket, and that socket is disconnected when a message comes in, I imagine it just drops it, or maybe the process dies along with the subscription. I doubt channels are buffering messages to websockets.
Anyone have any knowledge about that?

1 Like

Yeah, it looks like channels have no buffering and that’s a separate problem.
https://hexdocs.pm/phoenix/channels.html#resending-server-messages
So this library is appropriate for reliable pusub where the final destination is another elixir node. Any further guarantees to external clients would require buffering on the receiving node.

I see. For that to work, you would need to expose the cursor all the way to the client side for local storage and resume, or each message delivered to the client side would need to be acknowledged all the way back. Neither of them is easy.

1 Like

Sounds like an opportunity for producer to Process.monitor consumers and clean up when consumer exits?

Tho it sounds like consumers might legitimately dis/reconnect, so producer shouldn’t clean up immediately, in which case producer could Process.send_after itself with a timeout for each cursor?

Yeah, I think the interesting thing about this problem is that it’s not really about when a node disconnects that determines if it can be forgotten, since as you said, a major aspect of this is to allow for disconnects. Really what we’re trying to solve here is for when nodes become “expired”. This isn’t really based on a constant time because how quickly the buffer loops depends on the message throughput. If we really wanted a realtime way to check for expired nodes, I could always check on every message, but I’d rather not do that since it would slow down throughput slightly.

What do you think of using the built-in genserver timeout? After each “write” it would register a timeout, and once traffic slows down for enough time the timeout would fire and it would check for expired cursors and eject them.
Though the possibility of this never getting run in a busy system is real. It may just make sense to force a check every 12hrs or something. It should be almost instantaneous unless you have thousands of nodes in the cluster which seems unlikely.

I agree that the built-in genserver timeout is another great candidate for this. The point I’m trying to make is that each cursor should clean itself up. Instead of having one central “write” timeout that tries to clean up all cursors.

Which might mean each cursor is managed by its own dedicated process.