Stagger - Point-to-point, durable message-queues for GenStage

Stagger is a small library I wrote to provide simple, durable message queues, for a use-case where inbound data has to be reliably captured even when downstream processing might not be available. In this use-case, capture should be better than ‘in-memory only’ but didn’t warrant a full enterprise-grade message-queuing system.

The inbound data source opens and writes to the message queue (just a file)

{:ok, pid} = Stagger.open("/path/to/msg/queue")
...
:ok = Stagger.write(pid, msg)

The downstream processing is implemented as a GenStage consumer, and subscribes the producer to process the events:

use GenStage

def init(args) do
  {:ok, pid} = Stagger.open("/path/to/msg/queue")
  :ok = GenStage,async_subscribe(self(), to: pid)
  {:consumer, pid}
end

def handle_events(events, _from, state) do
  # process events
  {:noreply, [], state}
end

Other possible points of interest:

  • The Producer supports message recovery, purging and acknowledgement
  • The Producer is implemented via a GenServer and handles the GenStage protocol directly, as I considered the buffering to be already taken care of by the file itself
  • There’s a reasonably fully-featured stateful property test, via PropCheck, that puts the Producer through its paces.
  • The library has no extra dependencies other than GenStage.

I expect there will be at least some point releases in the near future :wink:

6 Likes