Adding DB operations to queue. Repo middleware or separate service?

Hi

I want to add every insert and update operation on a number of tables to a Kafka queue for further processing.

Approaches I’ve been thinking about:

  1. Add a QueueRepo module that can be substituted with the normal Ecto Repo, and that upon succesful DB insert/update, enqueues the corresponding data.

  2. A separate service like a GenServer that I manually call with the resultant data after the repo operation.

Would love to hear some thoughts on these approaches

1 Like

You can use Ecto’s Transactions for that. You’ll only have to put your Kafka functions at the end of the transaction block, so if there is something wrong with them your Database inserts/updates are rolled back.

Doing this way you also guarantee that they are queued, since these operations are done using Poolboy (an Erlang library for pooling).


Something in the lines:

alias Ecto.Multi

def my_transaction(account_changeset, profile_changeset) do
  Multi.new
  |> Multi.update(:update_account, account_changeset)
  |> Multi.insert(:create_profile, profile_changeset)
  |> Multi.run(:kafka_log, fn -> log_on_kafka(account_changeset, profile_changeset) end)
  |> Repo.transaction()
end

## Implementation of log_on_kafka

defp log_on_kafka(account_changeset, profile_changeset) do
  # ... your things with account and profile changesets ...

  if success do
    {:ok, result}
  else 
    {:error, reason}
  end
end

It’s important to return either {:ok, something} or {:error, something} because this is the way Repo.transaction will know if it’s needed to rollback or not your operation.

Reference: https://hexdocs.pm/ecto/Ecto.Multi.html

1 Like