Feeding log table from PostgreSQL to GenStage

Hello,

In our application all create, update and delete operations on Property “model” must be propagated to ElasticSearch service. Each such operation creates an entry in the log table, from which I believe it’s possible to feed them into GenStage producer to build a reactive and fault-tolerant exporting procedure with multiple retries, error reporting, and exponential backoff.

Log table schema:

property_operations_logs(
  property_id: References,
  operation:   String,
  fails:       Integer,  
  retried_at:  DateTime,
  run_at:      DateTime,
  backtrace:   Text,
  inserted_at: DateTime
)

where status can be on of the following running or failed. Jobs are not retired after they failed more than N times.
Assuming that all necessary DB query are implemented in PropertyLog module I’m thinking about nearly the following implementation:

defmodule Producer do
  def init(:ok) do
    :timer.send_interval(5000, :append)
    {:producer, {:queue.new, 0}}
  end

  def handle_call(:append, {queue, pending_demand}) do
    # Get all available jobs: new and failed with expired cooldown time; set status to "running"; break CQS :\
    PropertyLog.enqueue_jobs 
    |> Enum.reduce(queue, fn(e, q) -> :queue.in(event, q) end)
    |> dispatch_events(pending_demand, [])
  end

  def handle_demand(incoming_demand, {queue, pending_demand}) do
    dispatch_events(queue, incoming_demand + pending_demand, [])
  end

  defp dispatch_events(queue, 0, events) do
    {:noreply, Enum.reverse(events), {queue, 0}}
  end

  defp dispatch_events(queue, demand, events) do
    case :queue.out(queue) do
      {{:value, {event}}, queue} ->
        dispatch_events(queue, demand - 1, [event | events])
      {:empty, queue} ->
        {:noreply, Enum.reverse(events), {queue, demand}}
    end
  end
end

After hat Producer is relatively simple:

defmodule Consumer do
  def handle_events(jobs, _from, state) do
    for job <- jobs do
      # Set `run_at` property, so it will be possible to distinguish jobs that weren't run due to internal system error and jobs that were run but failed due to service error.
      # Later need to have their failure counter updated
      PropertyLog.mark_started(job)
      # Perform relevant operation and update the log: either mark as failed and erase `run_at` or remove the task
      perform(job) |> update_log 
    end
    {:noreply, [], state}
  end  
end

This might be a not best locking “algorithm”, but seems to work, as far as I can see. If there are better options please let me know.
A housekeeping GenServer process for resetting jobs that weren’t executed due to Producer/Consumer crash, or some other internal error (i.e. status is running but run_at is empty) is left behind the scenes.

What to do you think folks? Does it look good? Is there anything that I missed?