I have a GenServer that wraps a queue of items that need to be persisted. There are two operating concerns with the public API - (1) cast-ing the incoming messages that may be coming from multiple processes and (2) retrieving sets of items to be written.
Items must be written in-order (items have a key, multiple writes for given key may be present in the queue at any point in time.)
My thinking here is that a flow partitioner would be a great way to break this down by key and enable parallelism. Therefore, I’d like to create a Stream interface over this data structure, but I’m not finding documentation on how to do that in elixir. Any thoughts?
Below is the source GenServer.
defmodule Bergwerk do # The "Mine"
use GenServer
def start_link() do
# we keep length information because we don't want to incur a O[N] hit to get length
state = {0, :queue.new()}
GenServer.start_link(__MODULE__, state, name: __MODULE__)
end
def init(args) do
{:ok, args}
end
def add_msg(type, key, hash, data, time) do
map = %{
:type => type,
:key => key,
:hash => hash,
:data => data,
:time => time
}
GenServer.cast(__MODULE__, {:add_msg, map})
end
def demand_items(count) do
GenServer.call(__MODULE__, {:demand_items, count})
end
def handle_cast({:add_msg, map}, state) do
{number, q} = state
updated_queue = :queue.in(map, q)
{:noreply, {number + 1, updated_queue}}
end
def handle_call({:demand_items, demand}, _from, state) do
{count, queue} = state
case demand >= count do
true ->
{:reply, queue, {0, :queue.new()}}
false ->
{demanded_queue, state_queue} =:queue.split(demand, queue)
{:reply, demanded_queue, {count-demand, state_queue}}
end
end
end