Hi folks, looking for feedback on this new Phoenix pubsub adapter. It’s meant to be an in-memory lightweight substitute to using something like kafka or postgres WAL replication where it persists messages, and a set of cursors that subscribers can use to “catch up” on messages if they lose connection or get behind.
And by “lightweight” I mean that it doesn’t have full unlimited durability. Because it’s in-memory you get a constant-size ring buffer to hold messages. If a consuming node is disconnected from a producing node for so long that the read cursor is overwritten, when the node re-establishes a connection all subscribers on the consuming node will receive a special message
{:cursor_expired, publisher_node_name}
At that point, it is the application code’s responsibility to return to a valid state, by doing something like reloading state from a source of truth like a db or other node, etc.
Design
Producers are in charge of maintaining the set of cursors. This allows producers to “catch up” previously connected nodes, while newly connected nodes can create a new cursor and only receive messages going forward.
This brings up another edge case, if there is a network partition, and during that time, the producer restarts and loses all it’s state, including cursors. When the nodes reconnect, the consuming node will not know that it missed some messages during the outage since it will be seen as a “new node” to the producer and only get future messages.
This is a condition that this library is explicitly here to prevent. We want a guarantee that if I see message 3, that means I’ve also seen 2 and 1. There should never be gaps in messages. This allows us to use this library for streaming things like event_sourced messages where none can be missing. Subscribers can be “behind” or “eventually consistent” but they shouldn’t ever contain a state that never existed on the producer. I can’t remember what this property is called… so if someone can remind me (strict serializability?)
To solve this issue, consumers also keep a record of all the producers that have previously registered with. When they reconnect with a node they are previously registered with, they let the producer know they are registered. If the producer doesn’t have a record of that, it knows something is wrong and send the {:cursor_expired, producer_node_name}
message so the consumer knows it needs to reset state.
Some future iteration concerns:
-
Even though the state is stored in a constant size ring buffer, the cursors are monotonically increasing integers. This was an optimization to not have to update all cursor values periodically, but I might look into that in the future. In practice, I believe integers don’t have a “max value” in elixir, and for modern use cases of elixir, we often deploy enough that this gets reset frequently anyway.
-
Currently I also don’t “clean-up” old sessions, so if you have a long running node, and many consuming nodes come and go over time, the number of cursors would continue to grow. In the future I may add a cleanup timer to remove cursors that are expired anway.
-
Could provide the option for producers to fsync their buffer to disk for applications that want to minimize “expired” situations due to producer crashes.
Links
- GitHub - enewbury/phoenix_pubsub_buffered: Phoenix.PubSub with at-least-once delivery
- phoenix_pubsub_buffered | Hex
- PhoenixPubSubBuffered — PhoenixPubSubBuffered v0.1.0
Also if you have a better idea for a name, I’m open to suggestions!