Creating a Stream from an inmemory data source (queue)

I have a GenServer that wraps a queue of items that need to be persisted. There are two operating concerns with the public API - (1) cast-ing the incoming messages that may be coming from multiple processes and (2) retrieving sets of items to be written.

Items must be written in-order (items have a key, multiple writes for given key may be present in the queue at any point in time.)

My thinking here is that a flow partitioner would be a great way to break this down by key and enable parallelism. Therefore, I’d like to create a Stream interface over this data structure, but I’m not finding documentation on how to do that in elixir. Any thoughts?

Below is the source GenServer.

defmodule Bergwerk do # The "Mine"
  use GenServer

  def start_link() do
    # we keep length information because we don't want to incur a O[N] hit to get length
    state = {0,}
    GenServer.start_link(__MODULE__, state, name: __MODULE__)

  def init(args) do
    {:ok, args}

  def add_msg(type, key, hash, data, time) do
    map = %{
      :type => type,
      :key => key,
      :hash => hash,
      :data => data,
      :time => time

    GenServer.cast(__MODULE__, {:add_msg, map})

  def demand_items(count) do, {:demand_items, count})

  def handle_cast({:add_msg, map}, state) do
    {number, q} = state

    updated_queue =, q)

    {:noreply, {number + 1, updated_queue}}

  def handle_call({:demand_items, demand}, _from, state) do
    {count, queue} = state
    case demand >= count do
      true ->
        {:reply, queue, {0,}}
      false ->
        {demanded_queue, state_queue} =:queue.split(demand, queue)
        {:reply, demanded_queue, {count-demand, state_queue}}