Feeding log table from PostgreSQL to GenStage


In our application all create, update and delete operations on Property “model” must be propagated to ElasticSearch service. Each such operation creates an entry in the log table, from which I believe it’s possible to feed them into GenStage producer to build a reactive and fault-tolerant exporting procedure with multiple retries, error reporting, and exponential backoff.

Log table schema:

  property_id: References,
  operation:   String,
  fails:       Integer,  
  retried_at:  DateTime,
  run_at:      DateTime,
  backtrace:   Text,
  inserted_at: DateTime

where status can be on of the following running or failed. Jobs are not retired after they failed more than N times.
Assuming that all necessary DB query are implemented in PropertyLog module I’m thinking about nearly the following implementation:

defmodule Producer do
  def init(:ok) do
    :timer.send_interval(5000, :append)
    {:producer, {:queue.new, 0}}

  def handle_call(:append, {queue, pending_demand}) do
    # Get all available jobs: new and failed with expired cooldown time; set status to "running"; break CQS :\
    |> Enum.reduce(queue, fn(e, q) -> :queue.in(event, q) end)
    |> dispatch_events(pending_demand, [])

  def handle_demand(incoming_demand, {queue, pending_demand}) do
    dispatch_events(queue, incoming_demand + pending_demand, [])

  defp dispatch_events(queue, 0, events) do
    {:noreply, Enum.reverse(events), {queue, 0}}

  defp dispatch_events(queue, demand, events) do
    case :queue.out(queue) do
      {{:value, {event}}, queue} ->
        dispatch_events(queue, demand - 1, [event | events])
      {:empty, queue} ->
        {:noreply, Enum.reverse(events), {queue, demand}}

After hat Producer is relatively simple:

defmodule Consumer do
  def handle_events(jobs, _from, state) do
    for job <- jobs do
      # Set `run_at` property, so it will be possible to distinguish jobs that weren't run due to internal system error and jobs that were run but failed due to service error.
      # Later need to have their failure counter updated
      # Perform relevant operation and update the log: either mark as failed and erase `run_at` or remove the task
      perform(job) |> update_log 
    {:noreply, [], state}

This might be a not best locking “algorithm”, but seems to work, as far as I can see. If there are better options please let me know.
A housekeeping GenServer process for resetting jobs that weren’t executed due to Producer/Consumer crash, or some other internal error (i.e. status is running but run_at is empty) is left behind the scenes.

What to do you think folks? Does it look good? Is there anything that I missed?