Stagger is a small library I wrote to provide simple, durable message queues, for a use-case where inbound data has to be reliably captured even when downstream processing might not be available. In this use-case, capture should be better than ‘in-memory only’ but didn’t warrant a full enterprise-grade message-queuing system.
The inbound data source opens and writes to the message queue (just a file)
{:ok, pid} = Stagger.open("/path/to/msg/queue")
...
:ok = Stagger.write(pid, msg)
The downstream processing is implemented as a GenStage consumer, and subscribes the producer to process the events:
use GenStage
def init(args) do
{:ok, pid} = Stagger.open("/path/to/msg/queue")
:ok = GenStage,async_subscribe(self(), to: pid)
{:consumer, pid}
end
def handle_events(events, _from, state) do
# process events
{:noreply, [], state}
end
Other possible points of interest:
- The Producer supports message recovery, purging and acknowledgement
- The Producer is implemented via a GenServer and handles the GenStage protocol directly, as I considered the buffering to be already taken care of by the file itself
- There’s a reasonably fully-featured stateful property test, via PropCheck, that puts the Producer through its paces.
- The library has no extra dependencies other than GenStage.
I expect there will be at least some point releases in the near future