I’m doing an insert_all
on a dataset derived from a CSV. Prior to attempting this, we were always upserting using {:replace, fields}
on conflict. Basically, I want to extend this to optionally “skip” rows that already exist and only insert new records.
I was able to achieve this by passing the on_conflict
opt a custom update query which is built dynamically using the given fields of a struct. In the macro below (edited into pseudo code), I’m taking the fields from said struct, iterating over them and building a keyword list of dynamic queries, which look like - dynamic([q], coalesce(q.some_field, fragment("EXCLUDED.some_field")))
.
What this does is -
a) Updates conflicted rows with the value that is already persisted in the db (think about it as a no-op)
b) If a value is nil
will update it with the excluded value (this a very specific use case)
So now I can turn on the setting :import_new_records
and import the data from the CSV without worrying about overwriting existing records.
The code below does work but its a dangerous hack, building that dynamic fragment at runtime is extremely scary and even though I’m guarding by only operating on a specific input, I’d really like to avoid it entirely.
I’ve tried a million different ways to do this but Ecto doesn’t seem to like me dynamically building these fragments. If it helps, I’m happy to supply the Postgrex errors I get when I build the fragments normally without using eval_string/3
, just let me know!
Any help to achieve this safely is much appreciated.
Here is what the insert_all
is being called with -
conflict_action =
entity
|> get_setting(:import_new_records)
|> then(&if &1, do: on_conflict_update(struct), else: {:replace, fields})
insert_opts = [
returning: true,
on_conflict: conflict_action,
timeout: @timeout,
conflict_target: targets
]
Repo.insert_all(module, rows, insert_opts)
And here is the actual macro -
defmacro on_conflict_update(specific_struct) do
quote bind_quoted: [specific_struct: specific_struct] do
unless is_struct(specific_struct, SpecificStruct) do
raise ArgumentError,
message: "Input must be SpecificStruct: `#{inspect(specific_struct)}`"
end
update_args =
specific_struct.fields
|> Keyword.new(fn f ->
excluded =
"dynamic([q], fragment(\"EXCLUDED.#{f}\"))"
|> Code.eval_string(binding(), __ENV__)
|> elem(0)
{f, dynamic([q], coalesce(field(q, ^f), ^excluded))}
end)
[set: update_args]
end