I wanted to highlight how I’m using these (the code is going to production tomorrow). Essentially, I’m getting my data records like this:
activity =
PendingDataExchange
|> where(exchange_type: ^config.exchange)
|> where([pde], pde.updated_at <= ^config.timestamp)
|> select([pde], map(pde, ~w(user_id business_id exchange_type update_type)a))
|> limit(^config.size)
|> Repo.all(timeout: config.timeout)
Later, I’m loading records based on the activity in question:
records =
load_records_sql()
|> Repo.query([activity])
|> case do
{:ok, %{rows: rows}} -> {:ok, Enum.map(rows, &hd/1)}
error -> error
end
Finally, I’m purging my records:
Repo.query(purge_activity_sql(), [activity])
The SQL statements are complex, so I’ve simplified them some, but the load_records_sql/0
and purge_activity_sql/0
functions are just functions that return a triple-quoted ("""
) string.
load_records_sql/0:
SELECT jsonb_build_object(
'update_type', e.update_type,
'customer_id', c.id,
'owner_id', bo.id,
'customer', jsonb_build_object(
'first_name', c.first_name,
'last_name', c.last_name,
'role', buc.role
),
'business', jsonb_build_object(
'name', b.name
),
'membership', jsonb_build_object(
'code', m.code,
'assigned_at', m.assigned_at
)
) AS event
FROM jsonb_to_recordset($1::jsonb) AS e(
user_id uuid, business_id uuid, update_type update_type
)
INNER JOIN users AS c ON c.id = e.user_id
INNER JOIN businesses AS b ON b.id = e.business_id
INNER JOIN memberships AS m ON m.user_id = c.id
INNER JOIN business_users AS buc ON buc.user_id = c.id AND buc.business_id = b.id
INNER JOIN business_users AS buo ON buo.business_id = b.id AND buo.role = 'manager'
INNER JOIN users AS bo ON bo.id = buo.user_id;
Note that this returns a jsonb
object for each row, which will simplify a lot of the processing moving forward. jsonb_build_record
is almost as awesome as jsonb_to_recordset
.
purge_activity_sql/0:
DELETE FROM pending_data_exchange
USING jsonb_to_recordset($1::jsonb) AS e(
user_id uuid, business_id uuid, exchange_type citext
)
WHERE e.user_id = pending_data_exchange.user_id
AND e.business_id = pending_data_exchange.business_id
AND e.exchange_type = pending_data_exchange.exchange_type;
I just wish that I could somehow build something from a Repo.query
or fragment
that would let me use it as an Ecto source (e.g., just the jsonb_to_recordset($1::jsonb) AS …
bit.